[m-rev.] diff 3/3: [java] The thread pool now works when Mercury is used as

Paul Bone paul at bone.id.au
Tue Dec 16 10:15:18 AEDT 2014


Branches: master

The concept and names of the classes and new method have already been
improved.  So no review is necessary.

---

[java] The thread pool now works when Mercury is used as a library

The thread pool code used in the Java backend was tied the execution of
main/2.  However if Mercury is used as a library the thread pool won't have
been started and threads created with thread.spawn would not be executed.

This patch makes it possible to start and stop the thread pool independently of
main/2 by calling startup() and shutdown().  These calls are called
implicitly by calling runMain().  The thread pool can also be started on
demand.

This patch also adds the MercuryRuntime class, which now contains methods
that may be called by users' Java code to interact with the Mercury runtime
system, including a new finalise() method.

java/runtime/MercuryThreadPool.java:
    Add startup() method.

    shutdown() method is now public and it's meaning has changed, it now
    requests the shutdown rather than performing it.

    Renamed some variables to make their meanings clearer.

java/runtime/JavaInternal.java:
    Initialise the ThreadPool and MercuryOptions objects on demand.

    Make all members of this class static to avoid confusion.

    Add a private constructor.

java/runtime/MercuryRuntime.java:
    Add methods that can be called by Mercury users to interact with the
    runtime system.  Including a convenient finalise() method that does all
    the finalisation.

samples/java_interface/standalone_java/mercury_lib.m:
samples/java_interface/standalone_java/JavaMain.java:
    Extend the standalone Java example so that it makes use of threads: Add
    a fibs function in Mercury that uses concurrency and therefore starts
    the thread pool; call it from the Java code.

    Use the new finalise() method from the MercuryRuntime class inside of a
    finally block.

samples/java_interface/standalone_java/Makefile:
    Fix a minor error.
---
 java/runtime/JavaInternal.java                     |  49 ++++---
 java/runtime/MercuryRuntime.java                   |  59 +++++++++
 java/runtime/MercuryThreadPool.java                | 147 ++++++++++++++++++---
 .../java_interface/standalone_java/JavaMain.java   |  51 ++++---
 samples/java_interface/standalone_java/Makefile    |   2 +-
 .../java_interface/standalone_java/mercury_lib.m   |  42 ++++++
 6 files changed, 294 insertions(+), 56 deletions(-)
 create mode 100644 java/runtime/MercuryRuntime.java

diff --git a/java/runtime/JavaInternal.java b/java/runtime/JavaInternal.java
index 307dfcf..2f9ac75 100644
--- a/java/runtime/JavaInternal.java
+++ b/java/runtime/JavaInternal.java
@@ -1,5 +1,6 @@
 //
 // Copyright (C) 2001-2003, 2009 The University of Melbourne.
+// Copyright (C) 2014 The Mercury Team.
 // This file may only be copied under the terms of the GNU Library General
 // Public License - see the file COPYING.LIB in the Mercury distribution.
 //
@@ -9,33 +10,46 @@
 package jmercury.runtime;
 
 /**
- * Internals for Mercury's runtime system on the Java backend.
- * At the moment this class is used to store the main module's name (progname),
- * command line arguments and the exit status.  We can't put them in one of the
+ * Internals and static objects for Mercury's runtime system on the Java
+ * backend.
+ * This class is used to store the main module's name (progname), command
+ * line arguments and the exit status.  We can't put them in one of the
  * library modules because we need to hold them in a class variable in a top
  * level class.
  *
- * The class also contains utility methods.
+ * The class also contains utility methods and other objects such as a
+ * reference to the thread pool.
+ *
+ * No instance of this class is ever created, all it's members and methods
+ * are static.
  */
 public class JavaInternal {
 
-    private static JavaInternal         instance;
-
+    /**
+     * Private constructor.
+     * This private constructor doesn't do anything and isn't called by
+     * anyone.  It exists only to prevent people from creating an instance.
+     */
     private JavaInternal() {
-        options = new MercuryOptions();
-        options.process();
-        thread_pool = new MercuryThreadPool(options.getNumProcessors());
     }
 
-    private MercuryThreadPool thread_pool;
-    private MercuryOptions options;
+    private static MercuryThreadPool    thread_pool = null;
+    private static MercuryOptions       options = null;
 
-    public static MercuryThreadPool getThreadPool() {
-        return instance.thread_pool;
+    public static synchronized MercuryThreadPool getThreadPool() {
+        if (thread_pool == null) {
+            thread_pool = new MercuryThreadPool(
+                getOptions().getNumProcessors());
+        }
+        return thread_pool;
     }
 
-    public static MercuryOptions getOptions() {
-        return instance.options;
+    public static synchronized MercuryOptions getOptions() {
+        if (options == null) {
+            options = new MercuryOptions();
+            options.process();
+        }
+        return options;
     }
 
     public static java.lang.String      progname;
@@ -57,11 +71,12 @@ public class JavaInternal {
     }
 
     /**
-     * Run the main task using the thread pool.
+     * Run the main task.
+     * The maun task is executed by the thread pool so that when it blocks
+     * the thread pool is notified correctly.
      */
     public static void runMain(Runnable main)
     {
-        instance = new JavaInternal();
         getThreadPool().runMain(main);
     }
 
diff --git a/java/runtime/MercuryRuntime.java b/java/runtime/MercuryRuntime.java
new file mode 100644
index 0000000..5eaa927
--- /dev/null
+++ b/java/runtime/MercuryRuntime.java
@@ -0,0 +1,59 @@
+//
+// Copyright (C) 2014 The Mercury Team
+// This file may only be copied under the terms of the GNU Library General
+// Public License - see the file COPYING.LIB in the Mercury distribution.
+//
+
+package jmercury.runtime;
+
+/**
+ * Interface to the Mercury Runtime System for Java code.
+ *
+ * No instance of this class is ever created, all it's members and methods
+ * are static.
+ */
+public class MercuryRuntime
+{
+    /**
+     * Private constructor.
+     * This private constructor doesn't do anything and isn't called by
+     * anyone.  It exists only to prevent people from creating an instance.
+     */
+    private MercuryRuntime() {
+    }
+
+    private static MercuryThreadPool    thread_pool = null;
+
+    /**
+     * Return the thread pool, initalising it if required.
+     * This does not start the thread pool.  It is started either when
+     * startup() is called or automatically when the first task is
+     * submitted.
+     */
+    public static synchronized MercuryThreadPool getThreadPool()
+    {
+        if (thread_pool == null) {
+            thread_pool = new MercuryThreadPool(
+                JavaInternal.getOptions().getNumProcessors());
+        }
+        return thread_pool;
+    }
+
+    /**
+     * Retrive the exit status stored in the IO state.
+     */
+    public static int getExitStatus() {
+        return JavaInternal.exit_status;
+    }
+
+    /**
+     * Finalise the runtime system.
+     * This _must_ be cAlled at the normal end of any program.  Currently
+     * it runs finalisers and stops the thread pool.
+     */
+    public static void finalise() {
+        JavaInternal.run_finalisers();
+        getThreadPool().shutdown();
+    }
+}
+
diff --git a/java/runtime/MercuryThreadPool.java b/java/runtime/MercuryThreadPool.java
index 4b63460..be121a2 100644
--- a/java/runtime/MercuryThreadPool.java
+++ b/java/runtime/MercuryThreadPool.java
@@ -86,12 +86,19 @@ public class MercuryThreadPool
      * Tasks
      */
     private Deque<Task>             tasks;
-    private boolean                 should_shutdown;
     private long                    num_tasks_submitted;
     private long                    num_tasks_completed;
     private Lock                    tasks_lock;
     private Condition               thread_wait_for_task_condition;
 
+    // Has a shutdown request been received (protected by tasks_lock)
+    private boolean                 shutdown_request;
+    // True if worker threads should exit (the pool is shutting down).
+    private boolean                 shutdown_now;
+    // True if the thread pool is running (including starting up and
+    // shutting down).
+    private boolean                 running;
+
     /*
      * Main loop condition.
      */
@@ -121,7 +128,9 @@ public class MercuryThreadPool
          * ArrayDeque task will grow as needed.
          */
         tasks = new ArrayDeque<Task>(size*4);
-        should_shutdown = false;
+        shutdown_request = false;
+        shutdown_now = false;
+        running = false;
         num_tasks_submitted = 0;
         num_tasks_completed = 0;
         tasks_lock = new ReentrantLock();
@@ -155,6 +164,9 @@ public class MercuryThreadPool
             task.scheduled();
             num_tasks_submitted++;
             thread_wait_for_task_condition.signal();
+            if (!running) {
+                startup();
+            }
         } finally {
             tasks_lock.unlock();
         }
@@ -194,7 +206,7 @@ public class MercuryThreadPool
                  * there's a possibility that this could deadlock as we
                  * don't check that here.
                  */
-                if (should_shutdown) {
+                if (shutdown_now) {
                     return null;
                 }
 
@@ -341,8 +353,9 @@ public class MercuryThreadPool
     }
 
     /**
-     * Warm up the thread pool by starting some initial threads (currently
-     * one).
+     * Warm up the thread pool by starting some initial threads.
+     * Currently starts a single thread, other threads are started on
+     * demand.
      */
     protected void startupInitialThreads()
     {
@@ -439,13 +452,16 @@ public class MercuryThreadPool
     }
 
     /**
-     * Run the thread pool.  This is usually called by runMain()
+     * Run the thread pool.
+     * The calling thread is used to "run" the thread pool.  It's main job
+     * is to keep the correct number of worker threads alive.  It does not
+     * return until the thread pool is stopped (with a call to shutdown()).
+     * run() is usually called by runMain(), and shutdown() is usually
+     * called by the main task itself.
      */
     public void run()
     {
-        boolean done = false;
-        long    num_tasks_submitted;
-        long    num_tasks_completed;
+        boolean will_shutdown = false;
         boolean tasks_locked = false;
         boolean main_loop_locked = false;
 
@@ -457,11 +473,17 @@ public class MercuryThreadPool
                 tasks_lock.lock();
                 tasks_locked = true;
                 try {
+                    boolean okay_to_shutdown;
+                    long    num_tasks_submitted;
+                    long    num_tasks_completed;
+
                     num_tasks_submitted = this.num_tasks_submitted;
                     num_tasks_completed = this.num_tasks_completed;
-                    done = (num_tasks_submitted == num_tasks_completed);
+                    okay_to_shutdown =
+                        (num_tasks_submitted == num_tasks_completed);
+                    will_shutdown = okay_to_shutdown && shutdown_request;
 
-                    if (!done) {
+                    if (!will_shutdown) {
                         /*
                          * Start new threads if we have fewer than the
                          * thread_pool_size
@@ -489,7 +511,7 @@ public class MercuryThreadPool
                     tasks_locked = false;
                 }
 
-                if (!done) {
+                if (!will_shutdown) {
                     if (!main_loop_locked) {
                         main_loop_lock.lock();
                         main_loop_locked = true;
@@ -510,7 +532,7 @@ public class MercuryThreadPool
                  * places where it could have been acquired would be
                  * executed because done == true.
                  */
-            } while (!done);
+            } while (!will_shutdown);
         } finally {
             if (main_loop_locked) {
                 main_loop_lock.unlock();
@@ -520,27 +542,104 @@ public class MercuryThreadPool
             }
         }
 
-        shutdown();
+        /*
+         * Shutdown
+         */
+        tasks_lock.lock();
+        try {
+            shutdown_now = true;
+            thread_wait_for_task_condition.signalAll();
+            running = false;
+        } finally {
+            tasks_lock.unlock();
+        }
     }
 
-    protected void shutdown()
+    /**
+     * Start the thread pool in it's own thread.
+     * Normally the thread pool ie executed directly by the main thread.
+     * However, when Mercury is used as a library by a native Java
+     * application this is not true, and the thread pool runs in a thread of
+     * it's own.
+     */
+    public MercuryThread startup()
     {
+        MercuryThread thread;
+
         tasks_lock.lock();
         try {
-            should_shutdown = true;
-            thread_wait_for_task_condition.signalAll();
+            if (running) {
+                return null;
+            }
+            running = true;
+        } finally {
+            tasks_lock.unlock();
+        }
+
+        startupInitialThreads();
+        thread = thread_factory.newThread(new Runnable() {
+            public void run() {
+                MercuryThreadPool.this.startupInitialThreads();
+                MercuryThreadPool.this.run();
+            }
+        });
+        thread.start();
+        return thread;
+    }
+
+    /**
+     * Request that the thread pool shutdown.
+     * This method does not wait for the thread pool to shutdown, it's an
+     * asychronous signal.  The thread pool will shutdown if: shutdown() has
+     * been called (implicitly when running as an application) and there are
+     * no remaining tasks either queued or running (spawn_native tasks are
+     * not included).  The requirement that the process does not exit until
+     * all tasks have finish is maintained by the JVM.
+     */
+    public boolean shutdown()
+    {
+        tasks_lock.lock();
+        try {
+            if (running && !shutdown_request) {
+                shutdown_request = true;
+            } else {
+                return false;
+            }
         } finally {
             tasks_lock.unlock();
         }
 
+        signalMainLoop();
+        return true;
     }
 
     /**
      * Run the main/2 predicate and wait for its completion.
+     * This method implicitly starts and stops the thread pool.
      */
-    public void runMain(Runnable run_main)
+    public void runMain(final Runnable run_main)
     {
-        Task main_task = new Task(run_main);
+        Task main_task;
+        Runnable run_main_and_shutdown;
+
+        run_main_and_shutdown = new Runnable() {
+            public void run() {
+                run_main.run();
+                shutdown();
+            }
+        };
+        main_task = new Task(run_main_and_shutdown);
+
+        tasks_lock.lock();
+        try {
+            if (running) {
+                throw new ThreadPoolStateError("ThreadPool is already running");
+            }
+            running = true;
+        } finally {
+            tasks_lock.unlock();
+        }
+
         startupInitialThreads();
         submit(main_task);
         try {
@@ -592,5 +691,15 @@ public class MercuryThreadPool
             return next_thread_id++;
         }
     }
+
+    /**
+     * The thread pool is in the wrong state for the action the caller tried
+     * to perform.
+     */
+    public static class ThreadPoolStateError extends Exception {
+        public ThreadPoolStateError(String message) {
+            super(message);
+        }
+    }
 }
 
diff --git a/samples/java_interface/standalone_java/JavaMain.java b/samples/java_interface/standalone_java/JavaMain.java
index cfb7572..8cc9f31 100644
--- a/samples/java_interface/standalone_java/JavaMain.java
+++ b/samples/java_interface/standalone_java/JavaMain.java
@@ -1,10 +1,10 @@
 // vim: ts=4 sw=4 et
 
-// The JavaInternal class in the jmercury.runtime package provides various
+// The MercuryRuntime class in the jmercury.runtime package provides various
 // Mercury runtime services that we may require.
 // All Mercury runtime and generated Java code lives in the jmercury package.
 //
-import jmercury.runtime.JavaInternal;
+import jmercury.runtime.MercuryRuntime;
 
 // The mercury_lib class is generated by the compiler when we build
 // mercury_lib library.
@@ -20,9 +20,36 @@ public class JavaMain {
         // We do not need to do anything to initialise the Java version of the
         // Mercury runtime.  It will be automatically initialised as the
         // relevant classes are loaded by the JVM.
-
         out.println("JavaMain: start main");
 
+        try {
+            runProgram(args);
+        } finally {
+            // When we have finished calling Mercury procedures then we need to
+            // tell the Mercury Runtime that we've finished using it.
+            // This invokes any finalisers specified using ':- finalise'
+            // declarations in the set of Mercury libraries we are using.  It
+            // also tells the thread pool to shutdown, if the thread pool is not
+            // runnuing then this does nothing.
+            // The static method finalise() in the MercuryRuntime class does
+            // this.
+            //
+            MercuryRuntime.finalise();
+
+            // The Mercury exit status (as set by io.set_exit_status/1) may
+            // be read from the MercuryRuntime class.
+            //
+            out.println("JavaMain: Mercury exit status = "
+                + MercuryRuntime.getExitStatus());
+
+            out.println("JavaMain: end main");
+        }
+
+        System.exit(MercuryRuntime.getExitStatus());
+    }
+
+    public static void runProgram(String[] args) {
+
         // This is a call to an exported Mercury procedure that does some I/O.
         // The mercury_lib class contains a static method for each procedure
         // that is foreign exported to Java.
@@ -33,21 +60,7 @@ public class JavaMain {
         //
         out.println("3^3 = " + mercury_lib.cube(3));
 
-        // When we have finished calling Mercury procedures then we need to
-        // invoke any finalisers specified using ':- finalise' declarations in
-        // the set of Mercury libraries we are using.
-        // The static method run_finalisers() in the JavaInternal class does
-        // this.  It will also perform any Mercury runtime finalisation that
-        // may be needed.
-        //
-        JavaInternal.run_finalisers();
-
-        // The Mercury exit status (as set by io.set_exit_status/1) may be read
-        // from the static field 'exit_status' in the JavaInternal class.
-        //
-        out.println("JavaMain: Mercury exit status = "
-            + JavaInternal.exit_status);
-
-        out.println("JavaMain: end main");
+        // Try a parallelised Mercury function.
+        out.println("fibs(40) = " + mercury_lib.fibs(40));
    }
 }
diff --git a/samples/java_interface/standalone_java/Makefile b/samples/java_interface/standalone_java/Makefile
index 407104a..8c3bf94 100644
--- a/samples/java_interface/standalone_java/Makefile
+++ b/samples/java_interface/standalone_java/Makefile
@@ -14,7 +14,7 @@ MER_JARS = $(MER_LIB_DIR)/mer_std.jar:$(MER_LIB_DIR)/mer_rt.jar
 .PHONY: all
 all: run
 
-JavaMain.class: JavaMain.java mercury_lib.jar
+JavaMain.class: JavaMain.java libmercury_lib.jar
 	$(JAVAC) JavaMain.java -cp $(MER_JARS):Mercury/classs -d .
 
 libmercury_lib.jar: mercury_lib.m
diff --git a/samples/java_interface/standalone_java/mercury_lib.m b/samples/java_interface/standalone_java/mercury_lib.m
index 4fcdacc..efc52c3 100644
--- a/samples/java_interface/standalone_java/mercury_lib.m
+++ b/samples/java_interface/standalone_java/mercury_lib.m
@@ -17,6 +17,11 @@
     %
 :- func cube(int) = int.
 
+    % fibs(N) returns the Nth fibbonanci number using a parallelised naive
+    % algorithm.
+    %
+:- func fibs(int) = int.
+
 %-----------------------------------------------------------------------------%
 %-----------------------------------------------------------------------------%
 
@@ -25,6 +30,8 @@
 :- import_module int.
 :- import_module list.
 :- import_module string.
+:- import_module thread.
+:- import_module thread.future.
 
 %-----------------------------------------------------------------------------%
 
@@ -42,6 +49,41 @@ write_hello(!IO) :-
 cube(X) = X * X * X.
 
 %-----------------------------------------------------------------------------%
+
+%
+% Trivial concurrency test.  No one would normally write fibs this way.
+%
+:- pragma foreign_export("Java", fibs(in) = out,
+    "fibs").
+fibs(N) = fibs_par(N).
+
+:- func fibs_par(int) = int.
+
+fibs_par(N) = F :-
+    ( N < 2 ->
+        F = 1
+    ; N > fibs_thresh ->
+        F2 = future((func) = fibs_par(N-2)),
+        F1 = fibs_par(N-1),
+        F = F1 + wait(F2)
+    ;
+        F = fibs_seq(N-1) + fibs_seq(N-2)
+    ).
+
+:- func fibs_seq(int) = int.
+
+fibs_seq(N) =
+    ( N < 2 ->
+        1
+    ;
+        fibs_seq(N-2) + fibs_seq(N-1)
+    ).
+
+:- func fibs_thresh = int.
+
+fibs_thresh = 20.
+
+%-----------------------------------------------------------------------------%
 %
 % Initialiser for this library
 %
-- 
2.1.3




More information about the reviews mailing list