[m-rev.] for review: Use a thread pool to manage threads on the Java backend

Paul Bone paul at bone.id.au
Tue Jul 29 21:54:44 AEST 2014


On Tue, Jul 22, 2014 at 02:52:03PM +1000, Peter Wang wrote:
> On Tue, 15 Jul 2014 17:14:12 +1000, Paul Bone <paul at bone.id.au> wrote:
> > +
> > +    /**
> > +     * An option.  Call getopt.nextOption() to get each option.
> > +     */
> > +    public static class Option
> > +    {
> > +        private String option;
> > +        private String value;
> 
> What values can 'option' take on?

I've made this clearer by specifing that option must be without a leading -
or -- and adjusted the other code to conform with this.

> > diff --git a/java/runtime/MercuryOptions.java b/java/runtime/MercuryOptions.java
> > new file mode 100644
> > index 0000000..6f4c099
> > --- /dev/null
> > +++ b/java/runtime/MercuryOptions.java
> > @@ -0,0 +1,72 @@
> > +//
> > +// Copyright (C) 2014 The Mercury Team 
> > +// This file may only be copied under the terms of the GNU Library General
> > +// Public License - see the file COPYING.LIB in the Mercury distribution.
> > +//
> > +
> > +package jmercury.runtime;
> > +
> > +import java.util.List;
> > +
> > +/**
> > + * Process the MERCURY_OPTIONS environment variable.
> > + */
> > +public class MercuryOptions {
> > +    
> > +    private static MercuryOptions instance;
> > +
> > +    /**
> > +     * Get the singleton instance
> > +     */
> > +    public static synchronized MercuryOptions getInstance()
> > +    {
> > +        if (instance == null) {
> > +            instance = new MercuryOptions();
> > +            instance.process();
> > +        }
> > +
> > +        return instance;
> > +    }
> 
> MercuryThreadPool is already a singleton class so MercuryOptions doesn't
> need to be a singleton class itself.  You could remove some code.

What if other things add options to MercuryOptions in the future?  Or
parhaps we should introduce a class/make a class responsable for the handles
of all the singleton objects.

> > +
> > +    private int num_processors;
> > +
> > +    public MercuryOptions()
> > +    {
> > +        // Zero means autodetect
> > +        num_processors = 0;
> > +    }
> > +
> > +    public void process()
> > +    {
> > +        String          options;
> > +
> > +        options = System.getenv("MERCURY_OPTIONS");
> > +        if (options != null) {
> > +            Getopt getopt = new Getopt(options);
> > +            List<String> args;
> > +
> > +            for (Getopt.Option option : getopt) {
> > +                if (option.optionIs("P")) {
> > +                    num_processors = option.getValueInt();
> > +                } else {
> > +                    System.err.println("Unrecognized option: " + option);
> > +                    System.exit(1);
> > +                }
> > +            }
> > +            args = getopt.getArguments();
> > +            if (args.size() > 0) {
> > +                System.out.println(
> > +                    "Error parsing MERCURY_OPTIONS environment variable,"
> > +                    + " unexpected: " + args.get(0));
> > +            }
> 
> stderr?

Thanks.


> > diff --git a/java/runtime/MercuryThreadPool.java b/java/runtime/MercuryThreadPool.java
> > new file mode 100644
> > index 0000000..c72e34c
> > --- /dev/null
> > +++ b/java/runtime/MercuryThreadPool.java
> > @@ -0,0 +1,578 @@
> > +//
> > +// Copyright (C) 2014 The Mercury Team
> > +// This file may only be copied under the terms of the GNU Library General
> > +// Public License - see the file COPYING.LIB in the Mercury distribution.
> > +//
> > +
> > +package jmercury.runtime;
> > +
> > +import java.util.concurrent.locks.Condition;
> > +import java.util.concurrent.locks.Lock;
> > +import java.util.concurrent.locks.ReentrantLock;
> > +import java.util.concurrent.ThreadFactory;
> > +import java.util.*;
> > +
> > +/**
> > + * This class manages execution of Mercury threads and tasks.
> > + *
> > + * The pool attempts to reduce overheads, especially in embarrassingly
> > + * parallel workloads, by re-using threads and restricting the overall
> > + * number of threads.  By default a thread is created for each hardware
> > + * thread available.  If all threads are making progress then no new threads
> > + * will be created even if tasks are waiting, reducing overheads.  But if
> > + * one or more threads block on a barrier, channel, mvar or semaphore, then
> > + * new threads may be created to avoid deadlocks and attempt to keep all
> > + * processors busy.
> > + *
> > + * TODO: Currently the thread pool does not know when a thread is blocked by
> > + * a barrier, channel, mvar or semaphore.
> > + *
> > + * TODO: Currently the thread pool does not know when a thread is blocked in
> > + * foreign code or performing IO.
> > + *
> > + * Java's API provides several different thread pools, see
> > + * java.util.concurrent.Executors, none of which are suitable for our needs
> > + * so we have implemented our own in this class.  Specifically the fixed
> > + * thread pool is unsuitable as we want to be able to temporarily exceed the
> > + * normal number of threads to overcome deadlocks (and to keep processors
> > + * busy); and the cached thread pools, which are also very similar to the
> > + * ThreadPoolExecutor class, create threads (up to a maximum number) even if
> > + * all the processors are busy.  Additionally we cannot instrument this code
> > + * as easily for parallel profiling.
> > + *
> > + * The thread pool should be executed by calling getInstance() and run()
> > + * from the primordial thread.  run() will exit once all the work has been
> > + * completed, it will shutdown the worker threads as it exits.  The
> 
> completed; it will
> 
> > + * runMain() method provides a convenient wrapper for run which also
> > + * executes a task, such as the main/2 predicate of the application.
> > + */
> > +public class MercuryThreadPool
> > +    implements Runnable
> > +{
> > +    private static MercuryThreadPool    instance;
> > +
> > +    private MercuryThreadFactory        thread_factory;
> > +
> > +    /*
> > +     * Locking:
> > +     * Rather than synchronize on the 'this' pointer we create explicit
> > +     * locks.  The numbers of threads and list of threads are protected by
> > +     * the threads_lock lock.  The queue of tasks is protected by the
> > +     * tasks_lock.  Separate condition variables are also used,  This avoids
> > +     * threads waiting on conditions to not be woken up by a different
> > +     * condition, for example for example a thread can wait for the pool to
> > +     * shutdown without being woken when new work arrives.
> > +     *
> > +     * Safety: If you acquire more than one lock you must acquire locks in
> > +     *         this order: tasks_lock, threads_lock then main_loop_lock.  If
> > +     *         you wait on any condition you must only hold that condition's
> > +     *         lock.
> > +     */
> > +
> > +    /*
> > +     * Worker threads
> > +     */
> > +    private int                         thread_pool_size;
> > +    private int                         user_specified_size;
> > +    /*
> > +     * The sum of num_threads_* is the total number of threads in the pool
> > +     */
> > +    private int                         num_threads_working;
> > +    private int                         num_threads_waiting;
> > +    private int                         num_threads_other;
> > +    private LinkedList<MercuryThread>   threads;
> > +    private Lock                        threads_lock;
> > +
> > +    /*
> > +     * Tasks
> > +     */
> > +    private Deque<Task>             tasks;
> > +    private boolean                 should_shutdown;
> > +    private long                    num_tasks_submitted;
> > +    private long                    num_tasks_completed;
> > +    private Lock                    tasks_lock;
> > +    private Condition               thread_wait_for_task_condition;
> > +
> > +    /*
> > +     * Main loop condition.
> > +     */
> > +    private Lock                    main_loop_lock;
> > +    private Condition               main_loop_condition;
> > +
> > +    /**
> > +     * Get the singleton instance for the thread pool.
> > +     */
> > +    public synchronized static MercuryThreadPool getInstance()
> > +    {
> > +        if (instance == null) {
> > +            int p = MercuryOptions.getInstance().getNumProcessors();
> > +            instance = new MercuryThreadPool(p);
> > +        }
> > +
> > +        return instance;
> > +    }
> > +
> > +    /**
> > +     * Private constructor
> > +     */
> > +    private MercuryThreadPool(int size)
> > +    {
> > +        thread_factory = new MercuryThreadFactory(this);
> > +
> > +        user_specified_size = size;
> > +        thread_pool_size = size;
> > +        num_threads_working = 0;
> > +        num_threads_waiting = 0;
> > +        num_threads_other = 0;
> > +        threads = new LinkedList<MercuryThread>();
> > +        threads_lock = new ReentrantLock();
> > +
> > +        tasks = new ArrayDeque<Task>(size*4);
> 
> Please comment why *4.
>
> > +        should_shutdown = false;
> > +        num_tasks_submitted = 0;
> > +        num_tasks_completed = 0;
> > +        tasks_lock = new ReentrantLock();
> > +        thread_wait_for_task_condition = tasks_lock.newCondition();
> > +
> > +        main_loop_lock = new ReentrantLock();
> > +        main_loop_condition = main_loop_lock.newCondition();
> > +    }
> > +
> 
> 
> > +    /**
> > +     * Wait for a task to arrive.
> > +     * This call will block if there are no tasks available to execute.
> > +     * @return A task to execute.
> > +     */
> > +    public Task workerGetTask()
> > +            throws InterruptedException
> > +    {
> > +        Task    task;
> > +
> > +        tasks_lock.lock();
> > +        try {
> > +            do {
> > +                if (tooManyThreadsWaiting()) {
> > +                    /*
> > +                     * We already have plenty of threads waiting for
> > +                     * work.  Ask this one to shutdown.
> > +                     */
> > +                    return null;
> > +                }
> > +
> > +                task = tasks.poll();
> > +                if (task != null) {
> > +                    return task;
> > +                }
> > +
> > +                /*
> > +                 * If there are tasks currently being executed that spawn
> > +                 * other tasks while should_shutdown is true, then there's a
> > +                 * possibility that this could deadlock as we don't check
> > +                 * that here.
> > +                 */
> 
> XXX ?
> 

Erm yes.  I think I fix this in my next patch but I'll add the XXX and
double check.

> > +                if (should_shutdown) {
> > +                    return null;
> > +                }
> > +
> > +                thread_wait_for_task_condition.await();
> > +            } while (true);
> > +        } finally {
> > +            tasks_lock.unlock();
> > +        }
> > +    }
> > +
> > +    protected boolean tooManyThreadsWaiting()
> > +    {
> > +        if (num_threads_waiting > thread_pool_size) {
> > +            threads_lock.lock();
> > +            // Recheck with lock.
> > +            try {
> > +                return num_threads_waiting > thread_pool_size;
> > +            } finally {
> > +                threads_lock.unlock();
> > +            }
> > +        } else {
> > +            return false;
> > +        }
> > +    }
> 
> num_threads_waiting might need 'volatile'.
> This pattern always raises some flags so please add a comment.
> http://en.wikipedia.org/wiki/Double-checked_locking

I think you're right.  I misunderstood Java's model, thinking that the call
to lock() would work like a memory barrier for all variables, not just
volatile ones.  I think it does from the compiler's perspective, but not
necessarily from the memory write's perspective.

> ...
> > +    /**
> > +     * Run the thread pool.  This is usually called by runMain()
> > +     */
> > +    public void run()
> > +    {
> > +        boolean done = false;
> > +        int     num_working_blocked_threads;
> > +        long    num_tasks_submitted;
> > +        long    num_tasks_completed;
> > +        boolean tasks_locked = false;
> > +        boolean main_loop_locked = false;
> > +
> > +        try {
> > +            do {
> > +                /*
> > +                 * Have all the tasks been completed?
> > +                 */
> > +                tasks_lock.lock();
> > +                try {
> > +                    num_tasks_submitted = this.num_tasks_submitted;
> > +                    num_tasks_completed = this.num_tasks_completed;
> > +                    done = (num_tasks_submitted == num_tasks_completed);
> > +                    
> > +                    if (!done) {
> > +                        /*
> > +                         * Start new threads if we have fewer than the
> > +                         * thread_pool_size
> > +                         */
> > +                        num_working_blocked_threads = checkThreads();
> > +                    
> > +                        /*
> > +                         * Acquire the main loop lock while we're still
> > +                         * holding tasks_lock.  This prevents a race whereby
> > +                         * we release the locks below and then the last task
> > +                         * finishes but we don't get its signal on
> > +                         * main_loop_condition because we weren't holding
> > +                         * the lock.
> > +                         *
> > +                         * To preserve the locking order we must NOT
> > +                         * reacquire tasks_lock after releasing them while
> > +                         * still holding the main loop lock.  This must also
> > +                         * be executed after checkThreads();
> > +                         */
> > +                        main_loop_lock.lock();
> > +                        main_loop_locked = true;
> > +                    }
> > +                } finally {
> > +                    tasks_lock.unlock();
> > +                    tasks_locked = false;
> > +                }
> > +
> > +                if (!done) {
> > +                    if (!main_loop_locked) {
> > +                        main_loop_lock.lock();
> > +                        main_loop_locked = true;
> > +                    }
> > +                    try {
> > +                        main_loop_condition.await();
> > +                    } catch (InterruptedException e) {
> > +                        continue;
> > +                    } finally {
> > +                        main_loop_lock.unlock();
> > +                        main_loop_locked = false;
> > +                    }
> > +                }
> > +
> > +                /*
> > +                 * If we don't execute the if branch above, then the
> > +                 * main_lock cannot be held because neither of the two
> > +                 * places where it could have been acquired would be
> > +                 * executed because done == true.
> > +                 */
> > +            } while (!done);
> > +        } finally {
> > +            if (main_loop_locked) {
> > +                main_loop_lock.unlock();
> > +            }
> > +            if (tasks_locked) {
> > +                tasks_lock.unlock();
> > +            }
> > +        }
> 
> You don't have to change it, but I found this method confusing to
> follow.
> 

I'll see if I can make it clearer next time I update this code.

> > diff --git a/java/runtime/MercuryWorkerThread.java b/java/runtime/MercuryWorkerThread.java
> > new file mode 100644
> > index 0000000..b0aa0a2
> > --- /dev/null
> > +++ b/java/runtime/MercuryWorkerThread.java
> > @@ -0,0 +1,92 @@
> ...
> > +    /**
> > +     * Run.
> > +     * The worker thread executes tasks that it retrives from the pool.
> > +     */
> > +    public void run()
> > +    {
> > +        Task task;
> > +
> > +        do {
> > +            task = null;
> > +            try {
> > +                if (status != ThreadStatus.IDLE) {
> > +                    setStatusIdle();
> > +                }
> > +                task = pool.workerGetTask();
> > +            }
> > +            catch (InterruptedException e) {
> > +                /*
> > +                ** A worker thread has no semantics for this, so we continue
> > +                ** looping.
> > +                */
> > +                continue;
> > +            }
> > +            if (task != null) {
> > +                try {
> > +                    setStatusWorking();
> > +                    task.run();
> > +                    pool.taskDone(task);
> > +                } catch (jmercury.runtime.Exception e) {
> > +                    // The task threw a Mercury exception.
> > +                    pool.taskFailed(task, e);
> > +                    JavaInternal.reportUncaughtException(e);
> > +                } catch (Throwable e) {
> > +                    // Some other error occured. bail out.
> > +                    System.err.println("Uncaught exception: " + e.toString());
> > +                    System.err.println(e.getMessage());
> > +                    e.printStackTrace();
> > +                    System.exit(1);
> 
> Hmm, is there a less clean way to leave, like abort() vs exit() in C?
> 

AIUI abort() and exit() are just as "unclean" as each other.  Maybe I'm
misunderstanding what you mean by "clean".  When the program is aborting
because of a critical error, it's never really going to be "clean".

Do you want me to change something here?

> > +static {
> > +    if (null == jmercury.runtime.JavaInternal.exception_reporter) {
> > +        jmercury.runtime.JavaInternal.exception_reporter =
> > +            new ReportUncaughtException();
> > +    }
> > +}
> > +").
> 
> Maybe a method JavaInternal.set_exception_reporter?

I'd prefer that too.  I was following the style that we seem to use in the
other java code.


Thanks Peter,

I've made the other changes that you've suggested.  Sorry about the spelling
errors and typos.


-- 
Paul Bone



More information about the reviews mailing list