[m-rev.] for review: Use a thread pool to manage threads on the Java backend
Peter Wang
novalazy at gmail.com
Tue Jul 22 14:52:03 AEST 2014
On Tue, 15 Jul 2014 17:14:12 +1000, Paul Bone <paul at bone.id.au> wrote:
> FOr review by anyone.
>
> Branches: master
>
> ---
>
> Use a thread pool to manage threads on the Java backend
> diff --git a/compiler/mlds_to_java.m b/compiler/mlds_to_java.m
> index 2ece6ab..a6f43a8 100644
> --- a/compiler/mlds_to_java.m
> +++ b/compiler/mlds_to_java.m
> @@ -2064,19 +2064,12 @@ write_main_driver(Indent, ClassName, !IO) :-
> "jmercury.runtime.JavaInternal.args = args;",
> "jmercury.runtime.JavaInternal.exit_status = 0;",
> "benchmarking.ML_initialise();",
> - "try {",
> - " " ++ ClassName ++ ".main_2_p_0();",
> - " jmercury.runtime.JavaInternal.run_finalisers();",
> - "} catch (jmercury.runtime.Exception e) {",
> - " exception.ML_report_uncaught_exception(",
> - " (univ.Univ_0) e.exception);",
> - " if (System.getenv(""MERCURY_SUPPRESS_STACK_TRACE"") == null) {",
> - " e.printStackTrace(System.err);",
> - " }",
> - " if (jmercury.runtime.JavaInternal.exit_status == 0) {",
> - " jmercury.runtime.JavaInternal.exit_status = 1;",
> - " }",
> - "}",
> + "Runnable run_main = new Runnable() {",
> + " public void run() {",
> + " " ++ ClassName ++ ".main_2_p_0();",
> + " }",
> + " };",
Indentation
> + "jmercury.runtime.MercuryThreadPool.getInstance().runMain(run_main);",
I suggest this detail is also be moved into the runtime, e.g. by
providing jmercury.runtime.JavaInternal.runMain(Runnable);
> diff --git a/java/runtime/Getopt.java b/java/runtime/Getopt.java
> new file mode 100644
> index 0000000..184d98b
> --- /dev/null
> +++ b/java/runtime/Getopt.java
> @@ -0,0 +1,147 @@
> +//
> +// Copyright (C) 2014 The Mercury Team
> +// This file may only be copied under the terms of the GNU Library General
> +// Public License - see the file COPYING.LIB in the Mercury distribution.
> +//
> +
> +package jmercury.runtime;
> +
> +import java.util.*;
> +
> +/**
> + * Getopt style option passing.
> + * THis is currently very simple, it does only what is necessary.
> + */
This
> +public class Getopt implements Iterable<Getopt.Option>
> +{
> + String option_string;
> + List<Option> options;
> + List<String> arguments;
> +
> + /**
> + * Create a Getopt object to process the command line options.
> + */
> + public Getopt(String option_string)
> + {
> + this.option_string = option_string;
> + this.options = null;
> + }
> +
> + public Iterator<Option> iterator()
> + {
> + if (options == null) {
> + process();
> + }
> +
> + return options.iterator();
> + }
> +
> + /**
> + * Process the options. this is called automatically and the result is
> + * cached.
> + */
This
> + public void process()
> + {
> + StringTokenizer tok = new StringTokenizer(option_string);
> + Option option = null;
> +
> + options = new LinkedList<Option>();
> + arguments = new LinkedList<String>();
> +
> + while (tok.hasMoreTokens()) {
> + String str;
> +
> + str = tok.nextToken();
> + if (str.startsWith("-")) {
> + if (option != null) {
> + options.add(option);
> + option = null;
> + }
> + option = new Option(str);
> + } else {
> + if (option != null) {
> + option.setValue(str);
> + options.add(option);
> + option = null;
> + } else {
> + arguments.add(str);
> + /*
> + * The token we've got is an argument not an option,
> + * Store the remaining strings as arguments and stop
> + * processing.
> + */
> + while (tok.hasMoreTokens()) {
> + str = tok.nextToken();
> + arguments.add(str);
> + }
> + break;
> + }
> + }
> + }
> + }
> +
> + /**
> + * Get the arguments from the command line. Arguments are not options,
> + * they occur after the options list.
> + */
> + public List<String> getArguments() {
> + return arguments;
> + }
> +
> + /**
> + * An option. Call getopt.nextOption() to get each option.
> + */
> + public static class Option
> + {
> + private String option;
> + private String value;
What values can 'option' take on?
> +
> + public Option(String option, String value)
> + {
> + this.option = option;
> + this.value = value;
> + }
> +
> + public Option(String option)
> + {
> + this(option, null);
> + }
> +
> + public void setValue(String value)
> + {
> + this.value = value;
> + }
> +
> + public boolean optionIs(String option)
> + {
> + return (this.option.equals(option) ||
> + this.option.equals("-" + option) ||
> + this.option.equals("--" + option));
> + }
I think the caller should just pass the expected string, including the
correct number of dashes.
> +
> + public int getValueInt()
> + throws NumberFormatException
> + {
> + return Integer.parseInt(value);
> + }
> +
> + public String toString()
> + {
> + StringBuilder builder = new StringBuilder();
> +
> + if (option.length() == 1) {
> + builder.append("-");
> + } else if (!option.startsWith("-")) {
> + // This must be a double dash argument.
> + builder.append("--");
> + }
Here also, I think the option member ought to be unambiguous.
> + builder.append(option);
> + builder.append(" ");
> + builder.append(value);
> +
> + return builder.toString();
> + }
> + }
> +}
> +
> +
> diff --git a/java/runtime/JavaInternal.java b/java/runtime/JavaInternal.java
> index 0868441..a1c7352 100644
> --- a/java/runtime/JavaInternal.java
> +++ b/java/runtime/JavaInternal.java
> @@ -31,4 +35,45 @@ public class JavaInternal {
> r.run();
> }
> }
> +
> + /**
> + * Report uncaught exceptions.
> + * This reports exceptions using the exception reporter object, if it is
> + * set. Otherwise we make a best effort at reporting it.
> + *
> + * Exception reporting code is written in exception.m which we cannot
> + * link to from here. When that code starts it should set an exception
> + * reporter object.
> + */
> + public static void reportUncaughtException(jmercury.runtime.Exception e)
> + {
> + if (null != exception_reporter) {
> + exception_reporter.reportUncaughtException(e);
> + } else {
> + System.out.flush();
> + System.err.println("Uncaught exception: " + e.getMessage());
> + System.err.flush();
> + }
> +
> + if (shouldPrintStackTrace()) {
> + e.printStackTrace(System.err);
> + }
> + if (exit_status == 0) {
> + exit_status = 1;
> + }
> + }
> +
> + /**
> + * Should we print Java stack traces when we catch Mercury exceptions?
> + */
> + public static boolean shouldPrintStackTrace() {
> + return System.getenv("MERCURY_SUPPRESS_STACK_TRACE") == null;
> + }
> +
> + /**
> + * Intarface for reporting exceptions.
> + */
Interface
> diff --git a/java/runtime/MercuryOptions.java b/java/runtime/MercuryOptions.java
> new file mode 100644
> index 0000000..6f4c099
> --- /dev/null
> +++ b/java/runtime/MercuryOptions.java
> @@ -0,0 +1,72 @@
> +//
> +// Copyright (C) 2014 The Mercury Team
> +// This file may only be copied under the terms of the GNU Library General
> +// Public License - see the file COPYING.LIB in the Mercury distribution.
> +//
> +
> +package jmercury.runtime;
> +
> +import java.util.List;
> +
> +/**
> + * Process the MERCURY_OPTIONS environment variable.
> + */
> +public class MercuryOptions {
> +
> + private static MercuryOptions instance;
> +
> + /**
> + * Get the singleton instance
> + */
> + public static synchronized MercuryOptions getInstance()
> + {
> + if (instance == null) {
> + instance = new MercuryOptions();
> + instance.process();
> + }
> +
> + return instance;
> + }
MercuryThreadPool is already a singleton class so MercuryOptions doesn't
need to be a singleton class itself. You could remove some code.
> +
> + private int num_processors;
> +
> + public MercuryOptions()
> + {
> + // Zero means autodetect
> + num_processors = 0;
> + }
> +
> + public void process()
> + {
> + String options;
> +
> + options = System.getenv("MERCURY_OPTIONS");
> + if (options != null) {
> + Getopt getopt = new Getopt(options);
> + List<String> args;
> +
> + for (Getopt.Option option : getopt) {
> + if (option.optionIs("P")) {
> + num_processors = option.getValueInt();
> + } else {
> + System.err.println("Unrecognized option: " + option);
> + System.exit(1);
> + }
> + }
> + args = getopt.getArguments();
> + if (args.size() > 0) {
> + System.out.println(
> + "Error parsing MERCURY_OPTIONS environment variable,"
> + + " unexpected: " + args.get(0));
> + }
stderr?
> diff --git a/java/runtime/MercuryThreadPool.java b/java/runtime/MercuryThreadPool.java
> new file mode 100644
> index 0000000..c72e34c
> --- /dev/null
> +++ b/java/runtime/MercuryThreadPool.java
> @@ -0,0 +1,578 @@
> +//
> +// Copyright (C) 2014 The Mercury Team
> +// This file may only be copied under the terms of the GNU Library General
> +// Public License - see the file COPYING.LIB in the Mercury distribution.
> +//
> +
> +package jmercury.runtime;
> +
> +import java.util.concurrent.locks.Condition;
> +import java.util.concurrent.locks.Lock;
> +import java.util.concurrent.locks.ReentrantLock;
> +import java.util.concurrent.ThreadFactory;
> +import java.util.*;
> +
> +/**
> + * This class manages execution of Mercury threads and tasks.
> + *
> + * The pool attempts to reduce overheads, especially in embarrassingly
> + * parallel workloads, by re-using threads and restricting the overall
> + * number of threads. By default a thread is created for each hardware
> + * thread available. If all threads are making progress then no new threads
> + * will be created even if tasks are waiting, reducing overheads. But if
> + * one or more threads block on a barrier, channel, mvar or semaphore, then
> + * new threads may be created to avoid deadlocks and attempt to keep all
> + * processors busy.
> + *
> + * TODO: Currently the thread pool does not know when a thread is blocked by
> + * a barrier, channel, mvar or semaphore.
> + *
> + * TODO: Currently the thread pool does not know when a thread is blocked in
> + * foreign code or performing IO.
> + *
> + * Java's API provides several different thread pools, see
> + * java.util.concurrent.Executors, none of which are suitable for our needs
> + * so we have implemented our own in this class. Specifically the fixed
> + * thread pool is unsuitable as we want to be able to temporarily exceed the
> + * normal number of threads to overcome deadlocks (and to keep processors
> + * busy); and the cached thread pools, which are also very similar to the
> + * ThreadPoolExecutor class, create threads (up to a maximum number) even if
> + * all the processors are busy. Additionally we cannot instrument this code
> + * as easily for parallel profiling.
> + *
> + * The thread pool should be executed by calling getInstance() and run()
> + * from the primordial thread. run() will exit once all the work has been
> + * completed, it will shutdown the worker threads as it exits. The
completed; it will
> + * runMain() method provides a convenient wrapper for run which also
> + * executes a task, such as the main/2 predicate of the application.
> + */
> +public class MercuryThreadPool
> + implements Runnable
> +{
> + private static MercuryThreadPool instance;
> +
> + private MercuryThreadFactory thread_factory;
> +
> + /*
> + * Locking:
> + * Rather than synchronize on the 'this' pointer we create explicit
> + * locks. The numbers of threads and list of threads are protected by
> + * the threads_lock lock. The queue of tasks is protected by the
> + * tasks_lock. Separate condition variables are also used, This avoids
> + * threads waiting on conditions to not be woken up by a different
> + * condition, for example for example a thread can wait for the pool to
> + * shutdown without being woken when new work arrives.
> + *
> + * Safety: If you acquire more than one lock you must acquire locks in
> + * this order: tasks_lock, threads_lock then main_loop_lock. If
> + * you wait on any condition you must only hold that condition's
> + * lock.
> + */
> +
> + /*
> + * Worker threads
> + */
> + private int thread_pool_size;
> + private int user_specified_size;
> + /*
> + * The sum of num_threads_* is the total number of threads in the pool
> + */
> + private int num_threads_working;
> + private int num_threads_waiting;
> + private int num_threads_other;
> + private LinkedList<MercuryThread> threads;
> + private Lock threads_lock;
> +
> + /*
> + * Tasks
> + */
> + private Deque<Task> tasks;
> + private boolean should_shutdown;
> + private long num_tasks_submitted;
> + private long num_tasks_completed;
> + private Lock tasks_lock;
> + private Condition thread_wait_for_task_condition;
> +
> + /*
> + * Main loop condition.
> + */
> + private Lock main_loop_lock;
> + private Condition main_loop_condition;
> +
> + /**
> + * Get the singleton instance for the thread pool.
> + */
> + public synchronized static MercuryThreadPool getInstance()
> + {
> + if (instance == null) {
> + int p = MercuryOptions.getInstance().getNumProcessors();
> + instance = new MercuryThreadPool(p);
> + }
> +
> + return instance;
> + }
> +
> + /**
> + * Private constructor
> + */
> + private MercuryThreadPool(int size)
> + {
> + thread_factory = new MercuryThreadFactory(this);
> +
> + user_specified_size = size;
> + thread_pool_size = size;
> + num_threads_working = 0;
> + num_threads_waiting = 0;
> + num_threads_other = 0;
> + threads = new LinkedList<MercuryThread>();
> + threads_lock = new ReentrantLock();
> +
> + tasks = new ArrayDeque<Task>(size*4);
Please comment why *4.
> + should_shutdown = false;
> + num_tasks_submitted = 0;
> + num_tasks_completed = 0;
> + tasks_lock = new ReentrantLock();
> + thread_wait_for_task_condition = tasks_lock.newCondition();
> +
> + main_loop_lock = new ReentrantLock();
> + main_loop_condition = main_loop_lock.newCondition();
> + }
> +
> + /**
> + * Wait for a task to arrive.
> + * This call will block if there are no tasks available to execute.
> + * @return A task to execute.
> + */
> + public Task workerGetTask()
> + throws InterruptedException
> + {
> + Task task;
> +
> + tasks_lock.lock();
> + try {
> + do {
> + if (tooManyThreadsWaiting()) {
> + /*
> + * We already have plenty of threads waiting for
> + * work. Ask this one to shutdown.
> + */
> + return null;
> + }
> +
> + task = tasks.poll();
> + if (task != null) {
> + return task;
> + }
> +
> + /*
> + * If there are tasks currently being executed that spawn
> + * other tasks while should_shutdown is true, then there's a
> + * possibility that this could deadlock as we don't check
> + * that here.
> + */
XXX ?
> + if (should_shutdown) {
> + return null;
> + }
> +
> + thread_wait_for_task_condition.await();
> + } while (true);
> + } finally {
> + tasks_lock.unlock();
> + }
> + }
> +
> + protected boolean tooManyThreadsWaiting()
> + {
> + if (num_threads_waiting > thread_pool_size) {
> + threads_lock.lock();
> + // Recheck with lock.
> + try {
> + return num_threads_waiting > thread_pool_size;
> + } finally {
> + threads_lock.unlock();
> + }
> + } else {
> + return false;
> + }
> + }
num_threads_waiting might need 'volatile'.
This pattern always raises some flags so please add a comment.
http://en.wikipedia.org/wiki/Double-checked_locking
> + /**
> + * Check threads.
> + * Chacks the numbers and status of the worker threads and starts more
Checks
> + * threads if required.
> + * @return The number of currently working/blocked threads.
> + */
> + protected int checkThreads()
> + {
> + int num_new_threads;
> + int num_working_blocked_threads;
> + List<MercuryWorkerThread> new_threads =
> + new LinkedList<MercuryWorkerThread>();
> +
> + /*
> + * If necessary poll the Java runtime to see if the number of
> + * available processors has changed. I don't know if this actually
> + * changes in practice however the Java API says that one can and
> + * should poll it.
> + */
> + thread_pool_size = user_specified_size > 0 ? user_specified_size :
> + Runtime.getRuntime().availableProcessors();
Wrap the condition in brackets.
> + /*
> + * Start the threads while we're not holding the lock, this makes
> + * the above critical section smaller.
> + */
> + for (MercuryWorkerThread t : new_threads) {
> + t.start();
> + }
> +
> + /*
> + * If there are too many threads then superflous threads will
> + * shutdown when they try to get a new task.
> + */
> + return num_working_blocked_threads;
superfluous
(and strictly speaking, "shut down")
> + }
> +
...
> + /**
> + * Run the thread pool. This is usually called by runMain()
> + */
> + public void run()
> + {
> + boolean done = false;
> + int num_working_blocked_threads;
> + long num_tasks_submitted;
> + long num_tasks_completed;
> + boolean tasks_locked = false;
> + boolean main_loop_locked = false;
> +
> + try {
> + do {
> + /*
> + * Have all the tasks been completed?
> + */
> + tasks_lock.lock();
> + try {
> + num_tasks_submitted = this.num_tasks_submitted;
> + num_tasks_completed = this.num_tasks_completed;
> + done = (num_tasks_submitted == num_tasks_completed);
> +
> + if (!done) {
> + /*
> + * Start new threads if we have fewer than the
> + * thread_pool_size
> + */
> + num_working_blocked_threads = checkThreads();
> +
> + /*
> + * Acquire the main loop lock while we're still
> + * holding tasks_lock. This prevents a race whereby
> + * we release the locks below and then the last task
> + * finishes but we don't get its signal on
> + * main_loop_condition because we weren't holding
> + * the lock.
> + *
> + * To preserve the locking order we must NOT
> + * reacquire tasks_lock after releasing them while
> + * still holding the main loop lock. This must also
> + * be executed after checkThreads();
> + */
> + main_loop_lock.lock();
> + main_loop_locked = true;
> + }
> + } finally {
> + tasks_lock.unlock();
> + tasks_locked = false;
> + }
> +
> + if (!done) {
> + if (!main_loop_locked) {
> + main_loop_lock.lock();
> + main_loop_locked = true;
> + }
> + try {
> + main_loop_condition.await();
> + } catch (InterruptedException e) {
> + continue;
> + } finally {
> + main_loop_lock.unlock();
> + main_loop_locked = false;
> + }
> + }
> +
> + /*
> + * If we don't execute the if branch above, then the
> + * main_lock cannot be held because neither of the two
> + * places where it could have been acquired would be
> + * executed because done == true.
> + */
> + } while (!done);
> + } finally {
> + if (main_loop_locked) {
> + main_loop_lock.unlock();
> + }
> + if (tasks_locked) {
> + tasks_lock.unlock();
> + }
> + }
You don't have to change it, but I found this method confusing to
follow.
> +
> + shutdown();
> + }
> +
> + protected void shutdown()
> + {
> + tasks_lock.lock();
> + try {
> + should_shutdown = true;
> + thread_wait_for_task_condition.signalAll();
> + } finally {
> + tasks_lock.unlock();
> + }
> +
> + }
> +
> + /**
> + * Run the main thread and wait for it's completion.
> + */
its
Maybe:
Run the main/2 predicate and wait for its completion.
as "main thread" is often used interchangeably with "primordial thread".
> + public void runMain(Runnable run_main)
> + {
> + Task main_task = new Task(run_main);
> + submit(main_task);
> + try {
> + /*
> + * This thread (the primordial thread) operates the thread pool
> + * until the program is finished
> + */
> + run();
> + jmercury.runtime.JavaInternal.run_finalisers();
> + } catch (jmercury.runtime.Exception e) {
> + JavaInternal.reportUncaughtException(e);
> + }
> + }
> +
> diff --git a/java/runtime/MercuryWorkerThread.java b/java/runtime/MercuryWorkerThread.java
> new file mode 100644
> index 0000000..b0aa0a2
> --- /dev/null
> +++ b/java/runtime/MercuryWorkerThread.java
> @@ -0,0 +1,92 @@
...
> + /**
> + * Run.
> + * The worker thread executes tasks that it retrives from the pool.
> + */
> + public void run()
> + {
> + Task task;
> +
> + do {
> + task = null;
> + try {
> + if (status != ThreadStatus.IDLE) {
> + setStatusIdle();
> + }
> + task = pool.workerGetTask();
> + }
> + catch (InterruptedException e) {
> + /*
> + ** A worker thread has no semantics for this, so we continue
> + ** looping.
> + */
> + continue;
> + }
> + if (task != null) {
> + try {
> + setStatusWorking();
> + task.run();
> + pool.taskDone(task);
> + } catch (jmercury.runtime.Exception e) {
> + // The task threw a Mercury exception.
> + pool.taskFailed(task, e);
> + JavaInternal.reportUncaughtException(e);
> + } catch (Throwable e) {
> + // Some other error occured. bail out.
> + System.err.println("Uncaught exception: " + e.toString());
> + System.err.println(e.getMessage());
> + e.printStackTrace();
> + System.exit(1);
Hmm, is there a less clean way to leave, like abort() vs exit() in C?
> + } finally {
> + setStatusOther();
> + }
> + }
> + } while (task != null);
> +
> + pool.threadShutdown(this, status);
> + }
> +
> + protected void setStatusIdle() {
> + pool.updateThreadCounts(status, ThreadStatus.IDLE);
> + status = ThreadStatus.IDLE;
> + }
> +
> + protected void setStatusWorking() {
> + pool.updateThreadCounts(status, ThreadStatus.WORKING);
> + status = ThreadStatus.WORKING;
> + }
> +
> + protected void setStatusOther() {
> + pool.updateThreadCounts(status, ThreadStatus.OTHER);
> + status = ThreadStatus.OTHER;
> + }
> +}
Why not a single method?
> diff --git a/java/runtime/ThreadStatus.java b/java/runtime/ThreadStatus.java
> new file mode 100644
> index 0000000..4ba0294
> --- /dev/null
> +++ b/java/runtime/ThreadStatus.java
> @@ -0,0 +1,14 @@
> +//
> +// Copyright (C) 2014 The Mercury Team
> +// This file may only be copied under the terms of the GNU Library General
> +// Public License - see the file COPYING.LIB in the Mercury distribution.
> +//
> +
> +package jmercury.runtime;
> +
> +public enum ThreadStatus {
> + WORKING,
> + IDLE,
> + OTHER
> +}
> +
Add a short comment, at least for OTHER.
> diff --git a/library/exception.m b/library/exception.m
> index cd1507b..2c611bf 100644
> --- a/library/exception.m
> +++ b/library/exception.m
> @@ -2919,4 +2919,28 @@ now_near_stack_limits :-
> semidet_fail.
>
> %-----------------------------------------------------------------------------%
> +
> + % The Java runtime system sometiems wants to report exceptions, give it
> + % a reference that it can use to call library code to report exceptions.
> + %
sometimes
Make it two sentences.
> +:- pragma foreign_code("Java", "
> +
> +public static class ReportUncaughtException
> + implements jmercury.runtime.JavaInternal.ExceptionReporter
> +{
> + public void reportUncaughtException(jmercury.runtime.Exception e)
> + {
> + ML_report_uncaught_exception((univ.Univ_0) e.exception);
> + }
> +}
> +
> +static {
> + if (null == jmercury.runtime.JavaInternal.exception_reporter) {
> + jmercury.runtime.JavaInternal.exception_reporter =
> + new ReportUncaughtException();
> + }
> +}
> +").
Maybe a method JavaInternal.set_exception_reporter?
> diff --git a/library/thread.m b/library/thread.m
> index 4e3739b..676fd4e 100644
> --- a/library/thread.m
> +++ b/library/thread.m
...
> @@ -656,23 +682,24 @@ private class MercuryThread {
> }").
>
> :- pragma foreign_code("Java", "
> -private static class MercuryThread implements Runnable {
> - private final Object[] Goal;
> - private String ThreadId;
> +public static class RunGoal implements Runnable {
> + private final Object[] goal;
> + private String id;
>
> - private MercuryThread(Object[] g)
> + public RunGoal(Object[] g)
> {
> - Goal = g;
> + goal = g;
> + id = null;
> }
>
> - private void setThreadId(String id)
> + public void setId(String id)
> {
> - ThreadId = id;
> + this.id = id;
> }
Did you mean to change the access qualifiers?
Otherwise, I assume it's fine.
Peter
More information about the reviews
mailing list