[m-rev.] for review: Use a thread pool to manage threads on the Java backend

Paul Bone paul at bone.id.au
Tue Jul 15 17:14:12 AEST 2014


FOr review by anyone.

Branches: master

---

Use a thread pool to manage threads on the Java backend

Thread pools are often used to reduce poor performance on embarrassingly
parallel work loads.  This isn't immediately necessary on the Java backend
however I've implemented it because:
    + It will be required when we implement parallel conjunctions for the
      Java backend.
    + I want to implement parallel profiling on the Java backend and don't
      want to have to implement this once now, and then re-implement it after
      introducing thread pools later.

We want the thread pool to generally restrict the number of threads that are
in use, this reduces overheads.  However, when one or more tasks become
blocked then it can be desirable to create extra threads, this helps ensure
that all processors are kept busy and that thread pooling doesn't contribute
to any deadlocks itself.  The implementation is a work in prograss and
currently does not implement this second feature.

Java's API provides several different thread pools, see
java.util.concurrent.Executors, none of which are suitable.  Specifically
the fixed thread pool is unsuitable as we want to be able to temporarily
exceed the normal number of threads as explained above; and the cached
thread pools, which are also very similar to the ThreadPoolExecutor class,
do not implement the correct algorithm for determining when a new thread
should be created (they can still perform poorly for embarassingly parallel
workloads).  Additionally we cannot instrument this code as easily for
parallel profiling.

These changes alter the behaviour of Mercury threads on the Java backend in
two ways, they now behave more correctly and more like threads on the C
backends.
    + If a thread throws an exception it is now reported and the program is
      aborted.  Previously it was ignored and let pass to the Java runtime
      where I assume it was reported.
    + The program now exits only after all threads have exited.

The ThreadPool will automatically detect the number of threads to use, or if
the -P flag is given in the MERCURY_OPTIONS environment variable it will
honor that.

java/runtime/MercuryThread.java:
java/runtime/MercuryThreadPool.java:
java/runtime/MercuryWorkerThread.java:
java/runtime/Task.java:
java/runtime/ThreadStatus.java:
    These new classes make up the thread pool.  A MercuryThread is an
    abstract class for Mercury threads, MercuryWorkerThread is a concrete
    subclass of MercuryThread which includes the worker thread behaviour.
    A Task is a computation/closure that has not yet been started, it
    provides some methods not available in Java's generic Runnable and
    Callable classes.  The others should be self-explanatory and all files
    contain documentation.

java/runtime/Getopt.java:
java/runtime/MercuryOptions.java:
    Parse the MERCURY_OPTIONS environment variable for the -P flag.

java/runtime/JavaInternal.java:
    Add support for handling Mercury exceptions, this is used in case a
    worker thread's task (a Mercury thread) throws an exception.

compiler/mlds_to_java.m:
    The main method of the main Java class of an application now starts and
    uses the thread pool to execute main/2.

library/exception.m:
    Export exception reporting code to the Java runtime system.

library/thread.m:
    Use the thread pool for thread.spawn.
---
 compiler/mlds_to_java.m               |  19 +-
 java/runtime/Getopt.java              | 147 +++++++++
 java/runtime/JavaInternal.java        |  57 +++-
 java/runtime/MercuryOptions.java      |  72 +++++
 java/runtime/MercuryThread.java       |  28 ++
 java/runtime/MercuryThreadPool.java   | 578 ++++++++++++++++++++++++++++++++++
 java/runtime/MercuryWorkerThread.java |  92 ++++++
 java/runtime/Task.java                |  78 +++++
 java/runtime/ThreadStatus.java        |  14 +
 library/exception.m                   |  24 ++
 library/thread.m                      |  53 +++-
 11 files changed, 1130 insertions(+), 32 deletions(-)
 create mode 100644 java/runtime/Getopt.java
 create mode 100644 java/runtime/MercuryOptions.java
 create mode 100644 java/runtime/MercuryThread.java
 create mode 100644 java/runtime/MercuryThreadPool.java
 create mode 100644 java/runtime/MercuryWorkerThread.java
 create mode 100644 java/runtime/Task.java
 create mode 100644 java/runtime/ThreadStatus.java

diff --git a/compiler/mlds_to_java.m b/compiler/mlds_to_java.m
index 2ece6ab..a6f43a8 100644
--- a/compiler/mlds_to_java.m
+++ b/compiler/mlds_to_java.m
@@ -2064,19 +2064,12 @@ write_main_driver(Indent, ClassName, !IO) :-
         "jmercury.runtime.JavaInternal.args = args;",
         "jmercury.runtime.JavaInternal.exit_status = 0;",
         "benchmarking.ML_initialise();",
-        "try {",
-        "   " ++ ClassName ++ ".main_2_p_0();",
-        "   jmercury.runtime.JavaInternal.run_finalisers();",
-        "} catch (jmercury.runtime.Exception e) {",
-        "   exception.ML_report_uncaught_exception(",
-        "       (univ.Univ_0) e.exception);",
-        "   if (System.getenv(""MERCURY_SUPPRESS_STACK_TRACE"") == null) {",
-        "       e.printStackTrace(System.err);",
-        "   }",
-        "   if (jmercury.runtime.JavaInternal.exit_status == 0) {",
-        "       jmercury.runtime.JavaInternal.exit_status = 1;",
-        "   }",
-        "}",
+        "Runnable run_main = new Runnable() {",
+        "    public void run() {",
+        "      " ++ ClassName ++ ".main_2_p_0();",
+        "    }",
+        "  };",
+        "jmercury.runtime.MercuryThreadPool.getInstance().runMain(run_main);",
         "java.lang.System.exit(jmercury.runtime.JavaInternal.exit_status);"
     ],
     list.foldl(write_indented_line(Indent + 1), Body, !IO),
diff --git a/java/runtime/Getopt.java b/java/runtime/Getopt.java
new file mode 100644
index 0000000..184d98b
--- /dev/null
+++ b/java/runtime/Getopt.java
@@ -0,0 +1,147 @@
+//
+// Copyright (C) 2014 The Mercury Team 
+// This file may only be copied under the terms of the GNU Library General
+// Public License - see the file COPYING.LIB in the Mercury distribution.
+//
+
+package jmercury.runtime;
+
+import java.util.*;
+
+/**
+ * Getopt style option passing.
+ * THis is currently very simple, it does only what is necessary.
+ */
+public class Getopt implements Iterable<Getopt.Option>
+{
+    String              option_string;
+    List<Option>        options;
+    List<String>        arguments;
+
+    /**
+     * Create a Getopt object to process the command line options.
+     */
+    public Getopt(String option_string)
+    {
+        this.option_string = option_string;
+        this.options = null;
+    }
+
+    public Iterator<Option> iterator()
+    {
+        if (options == null) {
+            process();
+        }
+
+        return options.iterator();
+    }
+
+    /**
+     * Process the options. this is called automatically and the result is
+     * cached.
+     */
+    public void process()
+    {
+        StringTokenizer tok = new StringTokenizer(option_string);
+        Option          option = null;
+
+        options = new LinkedList<Option>();
+        arguments = new LinkedList<String>();
+
+        while (tok.hasMoreTokens()) {
+            String str;
+
+            str = tok.nextToken();
+            if (str.startsWith("-")) {
+                if (option != null) {
+                    options.add(option);
+                    option = null;
+                }
+                option = new Option(str);
+            } else {
+                if (option != null) {
+                    option.setValue(str);
+                    options.add(option);
+                    option = null;
+                } else {
+                    arguments.add(str);
+                    /*
+                     * The token we've got is an argument not an option,
+                     * Store the remaining strings as arguments and stop
+                     * processing.
+                     */
+                    while (tok.hasMoreTokens()) {
+                        str = tok.nextToken();
+                        arguments.add(str);
+                    }
+                    break;
+                }
+            }
+        }
+    }
+
+    /**
+     * Get the arguments from the command line.  Arguments are not options,
+     * they occur after the options list.
+     */
+    public List<String> getArguments() {
+        return arguments;
+    }
+
+    /**
+     * An option.  Call getopt.nextOption() to get each option.
+     */
+    public static class Option
+    {
+        private String option;
+        private String value;
+   
+        public Option(String option, String value)
+        {
+            this.option = option;
+            this.value = value;
+        }
+
+        public Option(String option)
+        {
+            this(option, null);
+        }
+
+        public void setValue(String value)
+        {
+            this.value = value;
+        }
+        
+        public boolean optionIs(String option)
+        {
+            return (this.option.equals(option) ||
+                    this.option.equals("-" + option) ||
+                    this.option.equals("--" + option));
+        }
+
+        public int getValueInt()
+            throws NumberFormatException
+        {
+            return Integer.parseInt(value);
+        }
+
+        public String toString()
+        {
+            StringBuilder builder = new StringBuilder();
+
+            if (option.length() == 1) {
+                builder.append("-");
+            } else if (!option.startsWith("-")) {
+                // This must be a double dash argument.
+                builder.append("--");
+            }
+            builder.append(option);
+            builder.append(" ");
+            builder.append(value);
+
+            return builder.toString();
+        }
+    }
+}
+
+
diff --git a/java/runtime/JavaInternal.java b/java/runtime/JavaInternal.java
index 0868441..a1c7352 100644
--- a/java/runtime/JavaInternal.java
+++ b/java/runtime/JavaInternal.java
@@ -5,19 +5,23 @@
 //
 // All modifications to this file will require changes to:
 // compiler/mlds_to_java.m
-//
-// At the moment this class is used to store the main module's name (progname),
-// command line arguments and the exit status.  We can't put them in one of the
-// library modules because we need to hold them in a class variable in a top
-// level class.
-//
 
 package jmercury.runtime;
 
+/**
+ * Internals for Mercury's runtime system on the Java backend.
+ * At the moment this class is used to store the main module's name (progname),
+ * command line arguments and the exit status.  We can't put them in one of the
+ * library modules because we need to hold them in a class variable in a top
+ * level class.
+ *
+ * The class also contains utility methods.
+ */
 public class JavaInternal {
     public static java.lang.String      progname;
     public static java.lang.String[]    args;
     public static int                   exit_status;
+    public static ExceptionReporter     exception_reporter;
 
     private static java.util.List<Runnable> finalisers
         = new java.util.ArrayList<Runnable>();
@@ -31,4 +35,45 @@ public class JavaInternal {
             r.run();
         }
     }
+
+    /**
+     * Report uncaught exceptions.
+     * This reports exceptions using the exception reporter object, if it is
+     * set.  Otherwise we make a best effort at reporting it.
+     *
+     * Exception reporting code is written in exception.m which we cannot
+     * link to from here.  When that code starts it should set an exception
+     * reporter object.
+     */
+    public static void reportUncaughtException(jmercury.runtime.Exception e)
+    {
+        if (null != exception_reporter) {
+            exception_reporter.reportUncaughtException(e);
+        } else {
+            System.out.flush();
+            System.err.println("Uncaught exception: " + e.getMessage());
+            System.err.flush();
+        }
+
+        if (shouldPrintStackTrace()) {
+            e.printStackTrace(System.err);
+        }
+        if (exit_status == 0) {
+            exit_status = 1;
+        }
+    }
+
+    /**
+     * Should we print Java stack traces when we catch Mercury exceptions?
+     */
+    public static boolean shouldPrintStackTrace() {
+        return System.getenv("MERCURY_SUPPRESS_STACK_TRACE") == null;
+    }
+
+    /**
+     * Intarface for reporting exceptions.
+     */
+    public interface ExceptionReporter {
+        public void reportUncaughtException(jmercury.runtime.Exception e);
+    }
 }
diff --git a/java/runtime/MercuryOptions.java b/java/runtime/MercuryOptions.java
new file mode 100644
index 0000000..6f4c099
--- /dev/null
+++ b/java/runtime/MercuryOptions.java
@@ -0,0 +1,72 @@
+//
+// Copyright (C) 2014 The Mercury Team 
+// This file may only be copied under the terms of the GNU Library General
+// Public License - see the file COPYING.LIB in the Mercury distribution.
+//
+
+package jmercury.runtime;
+
+import java.util.List;
+
+/**
+ * Process the MERCURY_OPTIONS environment variable.
+ */
+public class MercuryOptions {
+    
+    private static MercuryOptions instance;
+
+    /**
+     * Get the singleton instance
+     */
+    public static synchronized MercuryOptions getInstance()
+    {
+        if (instance == null) {
+            instance = new MercuryOptions();
+            instance.process();
+        }
+
+        return instance;
+    }
+
+    private int num_processors;
+
+    public MercuryOptions()
+    {
+        // Zero means autodetect
+        num_processors = 0;
+    }
+
+    public void process()
+    {
+        String          options;
+
+        options = System.getenv("MERCURY_OPTIONS");
+        if (options != null) {
+            Getopt getopt = new Getopt(options);
+            List<String> args;
+
+            for (Getopt.Option option : getopt) {
+                if (option.optionIs("P")) {
+                    num_processors = option.getValueInt();
+                } else {
+                    System.err.println("Unrecognized option: " + option);
+                    System.exit(1);
+                }
+            }
+            args = getopt.getArguments();
+            if (args.size() > 0) {
+                System.out.println(
+                    "Error parsing MERCURY_OPTIONS environment variable,"
+                    + " unexpected: " + args.get(0));
+            }
+        }
+    }
+
+    /**
+     * Get the number of processors in the machine.
+     */
+    public int getNumProcessors() {
+        return num_processors;
+    }
+
+}
diff --git a/java/runtime/MercuryThread.java b/java/runtime/MercuryThread.java
new file mode 100644
index 0000000..0e3b37f
--- /dev/null
+++ b/java/runtime/MercuryThread.java
@@ -0,0 +1,28 @@
+//
+// Copyright (C) 2014 The Mercury Team 
+// This file may only be copied under the terms of the GNU Library General
+// Public License - see the file COPYING.LIB in the Mercury distribution.
+//
+
+package jmercury.runtime;
+
+/**
+ * A thread with some Mercury specific support.
+ */
+public abstract class MercuryThread extends Thread
+{
+    private int                 id;
+
+    /**
+     * Construct a new MercuryThread with the given ID and runnable.
+     * @param name A string that identifies the type of thread.
+     * @param id A numeric identifier (should be unique).
+     */
+    public MercuryThread(String name, int id)
+    {
+        super(name + " " + id);
+        this.id = id;
+    }
+
+}
+
diff --git a/java/runtime/MercuryThreadPool.java b/java/runtime/MercuryThreadPool.java
new file mode 100644
index 0000000..c72e34c
--- /dev/null
+++ b/java/runtime/MercuryThreadPool.java
@@ -0,0 +1,578 @@
+//
+// Copyright (C) 2014 The Mercury Team
+// This file may only be copied under the terms of the GNU Library General
+// Public License - see the file COPYING.LIB in the Mercury distribution.
+//
+
+package jmercury.runtime;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ThreadFactory;
+import java.util.*;
+
+/**
+ * This class manages execution of Mercury threads and tasks.
+ *
+ * The pool attempts to reduce overheads, especially in embarrassingly
+ * parallel workloads, by re-using threads and restricting the overall
+ * number of threads.  By default a thread is created for each hardware
+ * thread available.  If all threads are making progress then no new threads
+ * will be created even if tasks are waiting, reducing overheads.  But if
+ * one or more threads block on a barrier, channel, mvar or semaphore, then
+ * new threads may be created to avoid deadlocks and attempt to keep all
+ * processors busy.
+ *
+ * TODO: Currently the thread pool does not know when a thread is blocked by
+ * a barrier, channel, mvar or semaphore.
+ *
+ * TODO: Currently the thread pool does not know when a thread is blocked in
+ * foreign code or performing IO.
+ *
+ * Java's API provides several different thread pools, see
+ * java.util.concurrent.Executors, none of which are suitable for our needs
+ * so we have implemented our own in this class.  Specifically the fixed
+ * thread pool is unsuitable as we want to be able to temporarily exceed the
+ * normal number of threads to overcome deadlocks (and to keep processors
+ * busy); and the cached thread pools, which are also very similar to the
+ * ThreadPoolExecutor class, create threads (up to a maximum number) even if
+ * all the processors are busy.  Additionally we cannot instrument this code
+ * as easily for parallel profiling.
+ *
+ * The thread pool should be executed by calling getInstance() and run()
+ * from the primordial thread.  run() will exit once all the work has been
+ * completed, it will shutdown the worker threads as it exits.  The
+ * runMain() method provides a convenient wrapper for run which also
+ * executes a task, such as the main/2 predicate of the application.
+ */
+public class MercuryThreadPool
+    implements Runnable
+{
+    private static MercuryThreadPool    instance;
+
+    private MercuryThreadFactory        thread_factory;
+
+    /*
+     * Locking:
+     * Rather than synchronize on the 'this' pointer we create explicit
+     * locks.  The numbers of threads and list of threads are protected by
+     * the threads_lock lock.  The queue of tasks is protected by the
+     * tasks_lock.  Separate condition variables are also used,  This avoids
+     * threads waiting on conditions to not be woken up by a different
+     * condition, for example for example a thread can wait for the pool to
+     * shutdown without being woken when new work arrives.
+     *
+     * Safety: If you acquire more than one lock you must acquire locks in
+     *         this order: tasks_lock, threads_lock then main_loop_lock.  If
+     *         you wait on any condition you must only hold that condition's
+     *         lock.
+     */
+
+    /*
+     * Worker threads
+     */
+    private int                         thread_pool_size;
+    private int                         user_specified_size;
+    /*
+     * The sum of num_threads_* is the total number of threads in the pool
+     */
+    private int                         num_threads_working;
+    private int                         num_threads_waiting;
+    private int                         num_threads_other;
+    private LinkedList<MercuryThread>   threads;
+    private Lock                        threads_lock;
+
+    /*
+     * Tasks
+     */
+    private Deque<Task>             tasks;
+    private boolean                 should_shutdown;
+    private long                    num_tasks_submitted;
+    private long                    num_tasks_completed;
+    private Lock                    tasks_lock;
+    private Condition               thread_wait_for_task_condition;
+
+    /*
+     * Main loop condition.
+     */
+    private Lock                    main_loop_lock;
+    private Condition               main_loop_condition;
+
+    /**
+     * Get the singleton instance for the thread pool.
+     */
+    public synchronized static MercuryThreadPool getInstance()
+    {
+        if (instance == null) {
+            int p = MercuryOptions.getInstance().getNumProcessors();
+            instance = new MercuryThreadPool(p);
+        }
+
+        return instance;
+    }
+
+    /**
+     * Private constructor
+     */
+    private MercuryThreadPool(int size)
+    {
+        thread_factory = new MercuryThreadFactory(this);
+
+        user_specified_size = size;
+        thread_pool_size = size;
+        num_threads_working = 0;
+        num_threads_waiting = 0;
+        num_threads_other = 0;
+        threads = new LinkedList<MercuryThread>();
+        threads_lock = new ReentrantLock();
+
+        tasks = new ArrayDeque<Task>(size*4);
+        should_shutdown = false;
+        num_tasks_submitted = 0;
+        num_tasks_completed = 0;
+        tasks_lock = new ReentrantLock();
+        thread_wait_for_task_condition = tasks_lock.newCondition();
+
+        main_loop_lock = new ReentrantLock();
+        main_loop_condition = main_loop_lock.newCondition();
+    }
+
+    /**
+     * Create a new thread to execute the given task.
+     * @param task The task the new thread should execute.
+     * @return The task.
+     */
+    public void submitExclusiveThread(Task task)
+    {
+        Thread t = thread_factory.newThread(task);
+        t.start();
+    }
+
+    /**
+     * Submit a task for execution.
+     * @param task The task.
+     * @return The task.
+     */
+    public void submit(Task task)
+    {
+        tasks_lock.lock();
+        try {
+            tasks.offer(task);
+            task.scheduled();
+            num_tasks_submitted++;
+            thread_wait_for_task_condition.signal();
+        } finally {
+            tasks_lock.unlock();
+        }
+
+        signalMainLoop();
+    }
+
+    /**
+     * Wait for a task to arrive.
+     * This call will block if there are no tasks available to execute.
+     * @return A task to execute.
+     */
+    public Task workerGetTask()
+            throws InterruptedException
+    {
+        Task    task;
+
+        tasks_lock.lock();
+        try {
+            do {
+                if (tooManyThreadsWaiting()) {
+                    /*
+                     * We already have plenty of threads waiting for
+                     * work.  Ask this one to shutdown.
+                     */
+                    return null;
+                }
+
+                task = tasks.poll();
+                if (task != null) {
+                    return task;
+                }
+
+                /*
+                 * If there are tasks currently being executed that spawn
+                 * other tasks while should_shutdown is true, then there's a
+                 * possibility that this could deadlock as we don't check
+                 * that here.
+                 */
+                if (should_shutdown) {
+                    return null;
+                }
+
+                thread_wait_for_task_condition.await();
+            } while (true);
+        } finally {
+            tasks_lock.unlock();
+        }
+    }
+
+    protected boolean tooManyThreadsWaiting()
+    {
+        if (num_threads_waiting > thread_pool_size) {
+            threads_lock.lock();
+            // Recheck with lock.
+            try {
+                return num_threads_waiting > thread_pool_size;
+            } finally {
+                threads_lock.unlock();
+            }
+        } else {
+            return false;
+        }
+    }
+
+    public void updateThreadCounts(ThreadStatus old, ThreadStatus new_)
+    {
+        threads_lock.lock();
+        try {
+            switch (old) {
+                case WORKING:
+                    num_threads_working--;
+                    break;
+                case IDLE:
+                    num_threads_waiting--;
+                    break;
+                case OTHER:
+                    num_threads_other--;
+                    break;
+                default:
+                    assert false : old;
+            }
+
+            switch (new_) {
+                case WORKING:
+                    num_threads_working++;
+                    break;
+                case IDLE:
+                    num_threads_waiting++;
+                    break;
+                case OTHER:
+                    num_threads_other++;
+                    break;
+                default:
+                    assert false : new_;
+            }
+        } finally {
+            threads_lock.unlock();
+        }
+
+        if ((new_ == ThreadStatus.IDLE) ||
+            (new_ == ThreadStatus.OTHER))
+        {
+            signalMainLoop();
+        }
+    }
+
+    public void threadShutdown(MercuryWorkerThread thread,
+        ThreadStatus state)
+    {
+        threads_lock.lock();
+        try {
+            switch (state) {
+                case WORKING:
+                    num_threads_working--;
+                    break;
+                case IDLE:
+                    num_threads_waiting--;
+                    break;
+                case OTHER:
+                    num_threads_other--;
+                    break;
+                default:
+                    assert false : state;
+            }
+            threads.remove(thread);
+        } finally {
+            threads_lock.unlock();
+        }
+    }
+
+    public void taskDone(Task task)
+    {
+        incrementCompletedTasks();
+    }
+
+    public void taskFailed(Task task, Exception e)
+    {
+        incrementCompletedTasks();
+    }
+
+    protected void incrementCompletedTasks()
+    {
+        long num_tasks_submitted;
+        long num_tasks_completed;
+
+        tasks_lock.lock();
+        try {
+            this.num_tasks_completed++;
+            num_tasks_submitted = this.num_tasks_submitted;
+            num_tasks_completed = this.num_tasks_completed;
+        } finally {
+            tasks_lock.unlock();
+        }
+
+        if (num_tasks_submitted == num_tasks_completed) {
+            signalMainLoop();
+        }
+    }
+
+    protected void signalMainLoop()
+    {
+        main_loop_lock.lock();
+        try {
+            main_loop_condition.signal();
+        } finally {
+            main_loop_lock.unlock();
+        }
+    }
+
+    /**
+     * Check threads.
+     * Chacks the numbers and status of the worker threads and starts more
+     * threads if required.
+     * @return The number of currently working/blocked threads.
+     */
+    protected int checkThreads()
+    {
+        int num_new_threads;
+        int num_working_blocked_threads;
+        List<MercuryWorkerThread> new_threads =
+            new LinkedList<MercuryWorkerThread>();
+
+        /*
+         * If necessary poll the Java runtime to see if the number of
+         * available processors has changed.  I don't know if this actually
+         * changes in practice however the Java API says that one can and
+         * should poll it.
+         */
+        thread_pool_size = user_specified_size > 0 ? user_specified_size :
+            Runtime.getRuntime().availableProcessors();
+
+        /*
+         * If we have fewer than the default number of threads then start
+         * some new threads.
+         */
+        threads_lock.lock();
+        try {
+            int num_threads = num_threads_working + num_threads_waiting +
+                num_threads_other;
+            num_working_blocked_threads = num_threads_working +
+                num_threads_other;
+            num_new_threads = thread_pool_size - num_threads;
+            if (num_new_threads > 0) {
+                for (int i = 0; i < num_new_threads; i++) {
+                    MercuryWorkerThread t = thread_factory.newWorkerThread();
+                    new_threads.add(t);
+                    threads.add(t);
+                }
+                num_threads = thread_pool_size;
+            }
+            num_threads_other += num_new_threads;
+        } finally {
+            threads_lock.unlock();
+        }
+
+        /*
+         * Start the threads while we're not holding the lock, this makes
+         * the above critical section smaller.
+         */
+        for (MercuryWorkerThread t : new_threads) {
+            t.start();
+        }
+
+        /*
+         * If there are too many threads then superflous threads will
+         * shutdown when they try to get a new task.
+         */
+        return num_working_blocked_threads;
+    }
+
+    /**
+     * Get the number of currently queued tasks.
+     * @return true if all tasks have been completed.
+     */
+    protected boolean checkTasks()
+    {
+        long    num_tasks_submitted;
+        long    num_tasks_completed;
+        boolean done;
+
+        tasks_lock.lock();
+        try {
+            num_tasks_submitted = this.num_tasks_submitted;
+            num_tasks_completed = this.num_tasks_completed;
+
+            done = (num_tasks_submitted == num_tasks_completed);
+        } finally {
+            tasks_lock.unlock();
+        }
+
+        return done;
+    }
+
+    /**
+     * Run the thread pool.  This is usually called by runMain()
+     */
+    public void run()
+    {
+        boolean done = false;
+        int     num_working_blocked_threads;
+        long    num_tasks_submitted;
+        long    num_tasks_completed;
+        boolean tasks_locked = false;
+        boolean main_loop_locked = false;
+
+        try {
+            do {
+                /*
+                 * Have all the tasks been completed?
+                 */
+                tasks_lock.lock();
+                try {
+                    num_tasks_submitted = this.num_tasks_submitted;
+                    num_tasks_completed = this.num_tasks_completed;
+                    done = (num_tasks_submitted == num_tasks_completed);
+                    
+                    if (!done) {
+                        /*
+                         * Start new threads if we have fewer than the
+                         * thread_pool_size
+                         */
+                        num_working_blocked_threads = checkThreads();
+                    
+                        /*
+                         * Acquire the main loop lock while we're still
+                         * holding tasks_lock.  This prevents a race whereby
+                         * we release the locks below and then the last task
+                         * finishes but we don't get its signal on
+                         * main_loop_condition because we weren't holding
+                         * the lock.
+                         *
+                         * To preserve the locking order we must NOT
+                         * reacquire tasks_lock after releasing them while
+                         * still holding the main loop lock.  This must also
+                         * be executed after checkThreads();
+                         */
+                        main_loop_lock.lock();
+                        main_loop_locked = true;
+                    }
+                } finally {
+                    tasks_lock.unlock();
+                    tasks_locked = false;
+                }
+
+                if (!done) {
+                    if (!main_loop_locked) {
+                        main_loop_lock.lock();
+                        main_loop_locked = true;
+                    }
+                    try {
+                        main_loop_condition.await();
+                    } catch (InterruptedException e) {
+                        continue;
+                    } finally {
+                        main_loop_lock.unlock();
+                        main_loop_locked = false;
+                    }
+                }
+
+                /*
+                 * If we don't execute the if branch above, then the
+                 * main_lock cannot be held because neither of the two
+                 * places where it could have been acquired would be
+                 * executed because done == true.
+                 */
+            } while (!done);
+        } finally {
+            if (main_loop_locked) {
+                main_loop_lock.unlock();
+            }
+            if (tasks_locked) {
+                tasks_lock.unlock();
+            }
+        }
+
+        shutdown();
+    }
+
+    protected void shutdown()
+    {
+        tasks_lock.lock();
+        try {
+            should_shutdown = true;
+            thread_wait_for_task_condition.signalAll();
+        } finally {
+            tasks_lock.unlock();
+        }
+
+    }
+
+    /**
+     * Run the main thread and wait for it's completion.
+     */
+    public void runMain(Runnable run_main)
+    {
+        Task main_task = new Task(run_main);
+        submit(main_task);
+        try {
+            /*
+             * This thread (the primordial thread) operates the thread pool
+             * until the program is finished
+             */
+            run();
+            jmercury.runtime.JavaInternal.run_finalisers();
+        } catch (jmercury.runtime.Exception e) {
+            JavaInternal.reportUncaughtException(e);
+        }
+    }
+
+    /**
+     * This class creates and names Mercury threads.
+     * The factory is responsible for creating threads with unique IDs,
+     */
+    private static class MercuryThreadFactory implements ThreadFactory
+    {
+        public MercuryThreadPool    pool;
+        public volatile int         next_thread_id;
+
+        /**
+         * Create a new thread factory.
+         */
+        public MercuryThreadFactory(MercuryThreadPool pool)
+        {
+            this.pool = pool;
+            next_thread_id = 0;
+        }
+
+        /**
+         * Create a new thread to execute the given task.
+         * @param runnable The task the new thread should execute.
+         */
+        public MercuryThread newThread(final Runnable runnable) {
+            return new MercuryThread("Mercury Thread", allocateThreadId())
+                {
+                    public void run() {
+                        runnable.run();
+                    }
+                };
+        }
+
+        public MercuryWorkerThread newWorkerThread() {
+            return new MercuryWorkerThread(pool, allocateThreadId());
+        }
+
+        /**
+         * Allocate a unique ID for a thread.
+         */
+        protected synchronized int allocateThreadId() {
+            return next_thread_id++;
+        }
+    }
+}
+
diff --git a/java/runtime/MercuryWorkerThread.java b/java/runtime/MercuryWorkerThread.java
new file mode 100644
index 0000000..b0aa0a2
--- /dev/null
+++ b/java/runtime/MercuryWorkerThread.java
@@ -0,0 +1,92 @@
+//
+// Copyright (C) 2014 The Mercury Team 
+// This file may only be copied under the terms of the GNU Library General
+// Public License - see the file COPYING.LIB in the Mercury distribution.
+//
+
+package jmercury.runtime;
+
+/**
+ * Threads for the Mercury code running in Java.
+ */
+public class MercuryWorkerThread extends MercuryThread
+{
+    private MercuryThreadPool   pool;
+
+    private ThreadStatus        status;
+
+    /**
+     * Construct a new MercuryThread with the given ID and runnable.
+     * @param pool The Mercury thread pool.
+     * @param id A numeric identifier (should be unique).
+     */
+    public MercuryWorkerThread(MercuryThreadPool pool, int id)
+    {
+        super("Mercury Worker Thread", id);
+        this.pool = pool;
+        this.status = ThreadStatus.OTHER;
+    }
+
+    /**
+     * Run.
+     * The worker thread executes tasks that it retrives from the pool.
+     */
+    public void run()
+    {
+        Task task;
+
+        do {
+            task = null;
+            try {
+                if (status != ThreadStatus.IDLE) {
+                    setStatusIdle();
+                }
+                task = pool.workerGetTask();
+            }
+            catch (InterruptedException e) {
+                /*
+                ** A worker thread has no semantics for this, so we continue
+                ** looping.
+                */
+                continue;
+            }
+            if (task != null) {
+                try {
+                    setStatusWorking();
+                    task.run();
+                    pool.taskDone(task);
+                } catch (jmercury.runtime.Exception e) {
+                    // The task threw a Mercury exception.
+                    pool.taskFailed(task, e);
+                    JavaInternal.reportUncaughtException(e);
+                } catch (Throwable e) {
+                    // Some other error occured. bail out.
+                    System.err.println("Uncaught exception: " + e.toString());
+                    System.err.println(e.getMessage());
+                    e.printStackTrace();
+                    System.exit(1);
+                } finally {
+                    setStatusOther();
+                }
+            }
+        } while (task != null);
+
+        pool.threadShutdown(this, status);
+    }
+
+    protected void setStatusIdle() {
+        pool.updateThreadCounts(status, ThreadStatus.IDLE);
+        status = ThreadStatus.IDLE;
+    }
+
+    protected void setStatusWorking() {
+        pool.updateThreadCounts(status, ThreadStatus.WORKING);
+        status = ThreadStatus.WORKING;
+    }
+
+    protected void setStatusOther() {
+        pool.updateThreadCounts(status, ThreadStatus.OTHER);
+        status = ThreadStatus.OTHER;
+    }
+}
+
diff --git a/java/runtime/Task.java b/java/runtime/Task.java
new file mode 100644
index 0000000..c0e3ff4
--- /dev/null
+++ b/java/runtime/Task.java
@@ -0,0 +1,78 @@
+//
+// Copyright (C) 2014 The Mercury Team 
+// This file may only be copied under the terms of the GNU Library General
+// Public License - see the file COPYING.LIB in the Mercury distribution.
+//
+
+package jmercury.runtime;
+
+/**
+ * Task is a task being managed by MercuryThreadPool.
+ * Callers can use this object to wait for the task's completion.
+ */
+public class Task implements Runnable
+{
+    private static long next_id;
+
+    private long        id;
+    private Runnable    target;
+    private Status      status;
+
+    public enum Status {
+        NEW,
+        SCHEDULED,
+        RUNNING,
+        FINISHED
+    }
+
+    /**
+     * Create a new task.
+     */
+    public Task(Runnable target) {
+        id = allocateTaskId();
+        this.target = target;
+        status = Status.NEW;
+    }
+
+    private static synchronized long allocateTaskId() {
+        return next_id++;
+    }
+
+    public void run() {
+        updateStatus(Status.RUNNING);
+        target.run();
+        updateStatus(Status.FINISHED);
+    }
+
+    public long getId() {
+        return id;
+    }
+
+    public void scheduled() {
+        updateStatus(Status.SCHEDULED);
+    }
+
+    /**
+     * Update the task's status and notify any threads waiting on the
+     * status change.
+     */
+    protected synchronized void updateStatus(Status status) {
+        this.status = status;
+        notifyAll();
+    }
+
+    /**
+     * Wait for the task to complete.
+     * This waits on the task's monitor.  Callers should not be holding any
+     * other monitors.
+     */
+    public synchronized void waitForTask()
+        throws InterruptedException
+    {
+        while (status != Status.FINISHED) {
+            wait();
+        }
+    }
+}
+
+
diff --git a/java/runtime/ThreadStatus.java b/java/runtime/ThreadStatus.java
new file mode 100644
index 0000000..4ba0294
--- /dev/null
+++ b/java/runtime/ThreadStatus.java
@@ -0,0 +1,14 @@
+//
+// Copyright (C) 2014 The Mercury Team 
+// This file may only be copied under the terms of the GNU Library General
+// Public License - see the file COPYING.LIB in the Mercury distribution.
+//
+
+package jmercury.runtime;
+
+public enum ThreadStatus {
+    WORKING,
+    IDLE,
+    OTHER
+}
+
diff --git a/library/exception.m b/library/exception.m
index cd1507b..2c611bf 100644
--- a/library/exception.m
+++ b/library/exception.m
@@ -2919,4 +2919,28 @@ now_near_stack_limits :-
     semidet_fail.
 
 %-----------------------------------------------------------------------------%
+
+    % The Java runtime system sometiems wants to report exceptions, give it
+    % a reference that it can use to call library code to report exceptions.
+    %
+:- pragma foreign_code("Java", "
+
+public static class ReportUncaughtException
+        implements jmercury.runtime.JavaInternal.ExceptionReporter
+{
+    public void reportUncaughtException(jmercury.runtime.Exception e)
+    {
+        ML_report_uncaught_exception((univ.Univ_0) e.exception);
+    }
+}
+
+static {
+    if (null == jmercury.runtime.JavaInternal.exception_reporter) {
+        jmercury.runtime.JavaInternal.exception_reporter =
+            new ReportUncaughtException();
+    }
+}
+").
+
+%-----------------------------------------------------------------------------%
 %-----------------------------------------------------------------------------%
diff --git a/library/thread.m b/library/thread.m
index 4e3739b..676fd4e 100644
--- a/library/thread.m
+++ b/library/thread.m
@@ -122,6 +122,11 @@
 #endif
 ").
 
+:- pragma foreign_decl("Java", "
+import jmercury.runtime.MercuryThreadPool;
+import jmercury.runtime.Task;
+").
+
     % The thread id is not formally exposed yet but allows different thread
     % handles to compare unequal.
     %
@@ -172,6 +177,13 @@ can_spawn_context :-
 ").
 
 :- pragma foreign_proc("Java",
+    can_spawn_context,
+    [will_not_call_mercury, promise_pure],
+"
+    SUCCESS_INDICATOR = true;
+").
+
+:- pragma foreign_proc("Java",
     can_spawn_native,
     [will_not_call_mercury, promise_pure],
 "
@@ -263,6 +275,20 @@ spawn_context_2(_, no, "", !IO) :-
 #endif /* MR_HIGHLEVEL_CODE */
 ").
 
+:- pragma foreign_proc("Java",
+    spawn_context_2(Goal::(pred(in, di, uo) is cc_multi), Success::out,
+        ThreadId::out, _IO0::di, _IO::uo),
+    [promise_pure, will_not_call_mercury, thread_safe, tabled_for_io,
+        may_not_duplicate],
+"
+    RunGoal rg = new RunGoal((Object[]) Goal);
+    Task task = new Task(rg);
+    ThreadId = String.valueOf(task.getId());
+    rg.setId(ThreadId);
+    MercuryThreadPool.getInstance().submitExclusiveThread(task);
+    Success = bool.YES;
+").
+
 %-----------------------------------------------------------------------------%
 
 spawn_native(Goal, Res, !IO) :-
@@ -320,11 +346,11 @@ spawn_native(Goal, Res, !IO) :-
     [promise_pure, will_not_call_mercury, thread_safe, tabled_for_io,
         may_not_duplicate],
 "
-    final MercuryThread mt = new MercuryThread((Object[]) Goal);
-    final Thread thread = new Thread(mt);
-    ThreadId = String.valueOf(thread.getId());
-    mt.setThreadId(ThreadId);
-    thread.start();
+    RunGoal rg = new RunGoal((Object[]) Goal);
+    Task task = new Task(rg);
+    ThreadId = String.valueOf(task.getId());
+    rg.setId(ThreadId);
+    MercuryThreadPool.getInstance().submit(task);
     Success = bool.YES;
 ").
 
@@ -656,23 +682,24 @@ private class MercuryThread {
 }").
 
 :- pragma foreign_code("Java", "
-private static class MercuryThread implements Runnable {
-    private final Object[] Goal;
-    private String ThreadId;
+public static class RunGoal implements Runnable {
+    private final Object[]  goal;
+    private String          id;
 
-    private MercuryThread(Object[] g)
+    public RunGoal(Object[] g)
     {
-        Goal = g;
+        goal = g;
+        id = null;
     }
 
-    private void setThreadId(String id)
+    public void setId(String id)
     {
-        ThreadId = id;
+        this.id = id;
     }
 
     public void run()
     {
-        thread.ML_call_back_to_mercury_cc_multi(Goal, ThreadId);
+        thread.ML_call_back_to_mercury_cc_multi(goal, id);
     }
 }").
 
-- 
2.0.0




More information about the reviews mailing list