[m-rev.] for review: Runtime granularity control changes.

Paul Bone pbone at csse.unimelb.edu.au
Thu Jun 11 16:41:32 AEST 2009


Estimated hours taken: 3 
Branches: main

Names of the runtime granularity control macros, variables and predicates are
now more descriptive and more consistent.

An alternative runtime granularity control predicate and macro is now
available, it considers the number of contexts and all sparks whereas the
original predicate and macro considers only the number of contexts and sparks
on the global queue.

A new predicate has been added to determine the number of worker threads that
the mercury runtime is configured to use.


library/par_builtin.m:
	Renamed predicates.
	Conform to changes in runtime/mercury_thread.h
	Added the new predicates.
	Removed some old foreign procedure attributes.

runtime/mercury_context.c:
runtime/mercury_context.h:
	Rename existing runtime granularity control variables and macros.
	Add new runtime granularity control variable and macro.

runtime/mercury_wrapper.c:
runtime/mercury_wrapper.h:
	Export MR_num_threads variable.
	Make this variable an MR_Unsigned.

runtime/mercury_atomic_ops.c:
runtime/mercury_atomic_ops.h:
	Introduce new atomic increment and decrement instructions.  These are used
	to count the number of local sparks created which is done outside of a
	critical section.

library/Mmakefile:
	Rebuild the par_builtin module when either runtime/mercury_context.h or
	runtime/mercury_thread.h change. 

Index: library/Mmakefile
===================================================================
RCS file: /home/mercury1/repository/mercury/library/Mmakefile,v
retrieving revision 1.158
diff -u -p -b -r1.158 Mmakefile
--- library/Mmakefile	1 Jun 2009 10:29:01 -0000	1.158
+++ library/Mmakefile	4 Jun 2009 09:22:12 -0000
@@ -434,6 +434,11 @@ $(os_subdir)table_builtin.pic_o \
 	../runtime/mercury_tabling_preds.h \
 	../runtime/mercury_minimal_model.h
 
+$(os_subdir)par_builtin.$O \
+$(os_subdir)par_builtin.pic_o \
+	: ../runtime/mercury_context.h \
+	../runtime/mercury_thread.h
+
 #-----------------------------------------------------------------------------#
 
 # In the past we generated liblibrary.* and then linked
Index: library/par_builtin.m
===================================================================
RCS file: /home/mercury1/repository/mercury/library/par_builtin.m,v
retrieving revision 1.18
diff -u -p -b -r1.18 par_builtin.m
--- library/par_builtin.m	4 Jun 2009 08:07:06 -0000	1.18
+++ library/par_builtin.m	11 Jun 2009 05:26:24 -0000
@@ -69,13 +69,30 @@
     %
 :- impure pred evaluate_parallelism_condition is semidet.
 
-    % par_cond_outstanding_jobs_vs_num_cpus(NumCPUs)
+    % par_cond_contexts_and_global_sparks_vs_num_cpus(NumCPUs)
     %
     % True iff NumCPUs > executable contexts + global sparks.
     %
     % Consider passing MR_num_threads as the argument.
     %
-:- impure pred par_cond_outstanding_jobs_vs_num_cpus(int::in) is semidet.
+:- impure pred par_cond_contexts_and_global_sparks_vs_num_cpus(int::in) 
+    is semidet.
+    
+    % par_cond_contexts_and_all_sparks_vs_num_cpus(NumCPUs)
+    %
+    % True iff NumCPUs > executable contexts + global sparks + local sparks.
+    %
+    % Consider passing MR_num_threads as the argument.
+    %
+:- impure pred par_cond_contexts_and_all_sparks_vs_num_cpus(int::in) is semidet.
+
+    % num_os_threads(Num)
+    %
+    % Num is the number of OS threads the runtime is configured to use, it is
+    % the value of MR_num_threads.  This is the value given to -P in the
+    % MERCURY_OPTIONS environment variable.
+    %
+:- pred num_os_threads(int::out) is det.
 
     % Close the file that was used to log the parallel condition decisions.
     %
@@ -368,11 +385,28 @@ INIT mercury_sys_init_par_builtin_module
 ").
 
 :- pragma foreign_proc("C",
-    par_cond_outstanding_jobs_vs_num_cpus(NumCPUs::in),
-    [will_not_call_mercury, thread_safe, may_not_duplicate],
+    par_cond_contexts_and_global_sparks_vs_num_cpus(NumCPUs::in),
+    [will_not_call_mercury, thread_safe],
+"
+#ifdef MR_LL_PARALLEL_CONJ
+    SUCCESS_INDICATOR =
+        MR_par_cond_contexts_and_global_sparks_vs_num_cpus(NumCPUs);
+  #ifdef MR_DEBUG_RUNTIME_GRANULARITY_CONTROL
+    MR_record_conditional_parallelism_decision(SUCCESS_INDICATOR);
+  #endif
+#else
+    MR_fatal_error(
+      ""par_cond_outstanding_jobs_vs_num_cpus is unavailable in this grade"");
+#endif
+").
+
+:- pragma foreign_proc("C",
+    par_cond_contexts_and_all_sparks_vs_num_cpus(NumCPUs::in),
+    [will_not_call_mercury, thread_safe],
 "
 #ifdef MR_LL_PARALLEL_CONJ
-    SUCCESS_INDICATOR = MR_choose_parallel_over_sequential_cond(NumCPUs);
+    SUCCESS_INDICATOR =
+        MR_par_cond_contexts_and_all_sparks_vs_num_cpus(NumCPUs);
   #ifdef MR_DEBUG_RUNTIME_GRANULARITY_CONTROL
     MR_record_conditional_parallelism_decision(SUCCESS_INDICATOR);
   #endif
@@ -383,8 +417,21 @@ INIT mercury_sys_init_par_builtin_module
 ").
 
 :- pragma foreign_proc("C",
+    num_os_threads(NThreads::out),
+    [will_not_call_mercury, will_not_throw_exception, thread_safe, 
+     promise_pure],
+"
+    /*
+     * MR_num_threads is available in all grades, although it won't make sense
+     * for non-parallel grades it will still reflect the value configured by
+     * the user.
+     */
+    NThreads = MR_num_threads
+").
+
+:- pragma foreign_proc("C",
     par_cond_close_stats_file(IO0::di, IO::uo),
-    [will_not_call_mercury, thread_safe, may_not_duplicate, promise_pure],
+    [will_not_call_mercury, thread_safe, promise_pure, tabled_for_io],
 "
 #ifdef MR_LL_PARALLEL_CONJ
   #ifdef MR_DEBUG_RUNTIME_GRANULARITY_CONTROL
Index: runtime/mercury_atomic_ops.c
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_atomic_ops.c,v
retrieving revision 1.2
diff -u -p -b -r1.2 mercury_atomic_ops.c
--- runtime/mercury_atomic_ops.c	24 Oct 2007 05:28:52 -0000	1.2
+++ runtime/mercury_atomic_ops.c	11 Jun 2009 05:38:19 -0000
@@ -31,4 +31,22 @@ MR_OUTLINE_DEFN(
     }
 )
 
+MR_OUTLINE_DEFN(
+    void 
+    MR_atomic_inc_int(volatile MR_Integer *addr)
+,
+    {
+        MR_ATOMIC_INC_WORD_BODY;
+    }
+)
+
+MR_OUTLINE_DEFN(
+    void 
+    MR_atomic_dec_int(volatile MR_Integer *addr)
+,
+    {
+        MR_ATOMIC_DEC_WORD_BODY;
+    }
+)
+
 #endif /* MR_LL_PARALLEL_CONJ */
Index: runtime/mercury_atomic_ops.h
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_atomic_ops.h,v
retrieving revision 1.2
diff -u -p -b -r1.2 mercury_atomic_ops.h
--- runtime/mercury_atomic_ops.h	24 Oct 2007 05:28:52 -0000	1.2
+++ runtime/mercury_atomic_ops.h	11 Jun 2009 05:38:01 -0000
@@ -16,6 +16,7 @@
 
 #include "mercury_std.h"
 
+/*---------------------------------------------------------------------------*/
 #if defined(MR_LL_PARALLEL_CONJ)
 
 /*
@@ -26,8 +27,6 @@ MR_EXTERN_INLINE MR_bool
 MR_compare_and_swap_word(volatile MR_Integer *addr, MR_Integer old,
         MR_Integer new_val);
 
-/*---------------------------------------------------------------------------*/
-
 #if __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 1)
 
     /*
@@ -79,11 +78,119 @@ MR_compare_and_swap_word(volatile MR_Int
     }
 #endif
 
+/*---------------------------------------------------------------------------*/
+
+/*
+** Increment the word pointed at by the address.
+*/
+MR_EXTERN_INLINE void
+MR_atomic_inc_int(volatile MR_Integer *addr);
+
+#if defined(__GNUC__) && defined(__x86_64__)
+
+    #define MR_ATOMIC_INC_WORD_BODY                                         \
+        do {                                                                \
+            __asm__ __volatile__(                                           \
+                "lock; incq %0;"                                            \
+                : "=m"(*addr)                                               \
+                : "m"(*addr)                                                \
+                );                                                          \
+        } while (0)
+
+#elif defined(__GNUC__) && defined(__i386__)
+
+    /* Really 486 or better. */
+    #define MR_ATOMIC_INC_WORD_BODY                                         \
+        do {                                                                \
+            __asm__ __volatile__(                                           \
+                "lock; incl %0;"                                            \
+                : "=m"(*addr)                                               \
+                : "m"(*addr)                                                \
+                );                                                          \
+        } while (0)
+
+#elif __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 1)
+
+    /*
+    ** gcc doesn't seem to have an atomic operation for increment, it does have
+    ** one for add though.  We prefer the hand-written increment operations
+    ** above.
+    */
+    #define MR_ATOMIC_INC_WORD_BODY                                         \
+        do {                                                                \
+            __sync_add_and_fetch(addr, 1);                                  \
+        } while (0)
+
+#endif
+
+#ifdef MR_ATOMIC_INC_WORD_BODY
+    MR_EXTERN_INLINE void 
+    MR_atomic_inc_int(volatile MR_Integer *addr)
+    {
+        MR_ATOMIC_INC_WORD_BODY;
+    }
+#endif
+
+/*---------------------------------------------------------------------------*/
+
+/*
+** Decrement the word pointed at by the address.
+*/
+MR_EXTERN_INLINE void
+MR_atomic_dec_int(volatile MR_Integer *addr);
+
+#if defined(__GNUC__) && defined(__x86_64__)
+
+    #define MR_ATOMIC_DEC_WORD_BODY                                         \
+        do {                                                                \
+            __asm__ __volatile__(                                           \
+                "lock; decq %0;"                                            \
+                : "=m"(*addr)                                               \
+                : "m"(*addr)                                                \
+                );                                                          \
+        } while (0)
+
+#elif defined(__GNUC__) && defined(__i386__)
+
+    /* Really 486 or better. */
+    #define MR_ATOMIC_DEC_WORD_BODY                                         \
+        do {                                                                \
+            __asm__ __volatile__(                                           \
+                "lock; decl %0;"                                            \
+                : "=m"(*addr)                                               \
+                : "m"(*addr)                                                \
+                );                                                          \
+        } while (0)
+
+#elif __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 1)
+
+    /*
+    ** gcc doesn't seem to have an atomic operation for increment, it does have
+    ** one for add though.  We prefer the hand-written increment operations
+    ** above.
+    */
+    #define MR_ATOMIC_DEC_WORD_BODY                                         \
+        do {                                                                \
+            __sync_sub_and_fetch(addr, 1);                                  \
+        } while (0)
+
+#endif
+
+#ifdef MR_ATOMIC_DEC_WORD_BODY
+    MR_EXTERN_INLINE void 
+    MR_atomic_dec_int(volatile MR_Integer *addr)
+    {
+        MR_ATOMIC_DEC_WORD_BODY;
+    }
+#endif
+
+#endif /* MR_LL_PARALLEL_CONJ */
+/*---------------------------------------------------------------------------*/
+
 /*
 ** If we don't have definitions available for this compiler or architecture
 ** then we will get a link error in low-level .par grades.  No other grades
 ** currently require any atomic ops.
 */
 
-#endif /* MR_LL_PARALLEL_CONJ */
 #endif /* not MERCURY_ATOMIC_OPS_H */
Index: runtime/mercury_context.c
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_context.c,v
retrieving revision 1.62
diff -u -p -b -r1.62 mercury_context.c
--- runtime/mercury_context.c	4 Jun 2009 08:07:09 -0000	1.62
+++ runtime/mercury_context.c	11 Jun 2009 05:39:25 -0000
@@ -80,7 +80,8 @@ static MR_Context       *free_small_cont
 
 #ifdef  MR_LL_PARALLEL_CONJ
 int volatile MR_num_idle_engines = 0;
-int volatile MR_num_outstanding_contexts_and_sparks = 0;
+int volatile MR_num_outstanding_contexts_and_global_sparks = 0;
+int volatile MR_num_outstanding_contexts_and_all_sparks = 0;
 
 static MercuryLock MR_par_cond_stats_lock;
 #endif
@@ -321,7 +322,8 @@ MR_create_context(const char *id, MR_Con
     MR_LOCK(&free_context_list_lock, "create_context");
 
 #ifdef MR_LL_PARALLEL_CONJ
-    MR_num_outstanding_contexts_and_sparks++;
+    MR_num_outstanding_contexts_and_global_sparks++;
+    MR_atomic_inc_int(&MR_num_outstanding_contexts_and_all_sparks);
 #endif
 
     /*
@@ -380,7 +382,8 @@ MR_destroy_context(MR_Context *c)
 
     MR_LOCK(&free_context_list_lock, "destroy_context");
 #ifdef MR_LL_PARALLEL_CONJ
-    MR_num_outstanding_contexts_and_sparks--;
+    MR_num_outstanding_contexts_and_global_sparks--;
+    MR_atomic_dec_int(&MR_num_outstanding_contexts_and_all_sparks);
 #endif
 
     switch (c->MR_ctxt_size) {
@@ -538,7 +541,8 @@ MR_schedule_spark_globally(const MR_Spar
 {
     MR_LOCK(&MR_runqueue_lock, "schedule_spark_globally");
     MR_wsdeque_push_bottom(&MR_spark_queue, proto_spark);
-    MR_num_outstanding_contexts_and_sparks++;
+    MR_num_outstanding_contexts_and_global_sparks++;
+    MR_atomic_inc_int(&MR_num_outstanding_contexts_and_all_sparks);
     MR_SIGNAL(&MR_runqueue_cond);
     MR_UNLOCK(&MR_runqueue_lock, "schedule_spark_globally");
 }
@@ -617,7 +621,8 @@ MR_define_entry(MR_do_runnext);
         /* Check if the global spark queue is nonempty. */
         if (MR_wsdeque_take_top(&MR_spark_queue, &spark)) {
             MR_num_idle_engines--;
-            MR_num_outstanding_contexts_and_sparks--;
+            MR_num_outstanding_contexts_and_global_sparks--;
+            MR_atomic_dec_int(&MR_num_outstanding_contexts_and_all_sparks);
             goto ReadySpark;
         }
 
Index: runtime/mercury_context.h
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_context.h,v
retrieving revision 1.48
diff -u -p -b -r1.48 mercury_context.h
--- runtime/mercury_context.h	4 Jun 2009 08:07:09 -0000	1.48
+++ runtime/mercury_context.h	11 Jun 2009 06:07:22 -0000
@@ -59,6 +59,7 @@
 #include "mercury_goto.h"       /* for MR_GOTO() */
 #include "mercury_conf.h"       /* for MR_CONSERVATIVE_GC */
 #include "mercury_backjump.h"   /* for MR_BackJumpHandler, etc */
+#include "mercury_atomic_ops.h" /* for MR_atomic_* */
 
 #ifdef  MR_THREAD_SAFE
   #define MR_IF_THREAD_SAFE(x)  x
@@ -392,12 +393,12 @@ extern  MR_PendingContext   *MR_pending_
 
   /*
   ** The number of contexts that are not in the free list (i.e. are executing
-  ** or suspended) plus the number of sparks in the spark queue.  We count
-  ** those sparks as they can quickly accumulate on the spark queue before any
-  ** of them are taken up for execution.  Once they do get taken up, many
-  ** contexts would need to be allocated to execute them.  Sparks not on the
-  ** spark queue are currently guaranteed to be executed on their originating
-  ** context so won't cause allocation of more contexts.
+  ** or suspended) plus the number of sparks in the global spark queue.
+  ** We count those sparks as they can quickly accumulate on the spark queue
+  ** before any of them are taken up for execution. Once they do get taken up,
+  ** many contexts would need to be allocated to execute them. Sparks not
+  ** on the global spark queue are currently guaranteed to be executed
+  ** on their originating context so won't cause allocation of more contexts.
   **
   ** What we are mainly interested in here is preventing too many contexts from
   ** being allocated, as each context is quite large and we can quickly run out
@@ -406,7 +407,15 @@ extern  MR_PendingContext   *MR_pending_
   ** scanned.  (Getting the garbage collector not to scan contexts on the free
   ** list should be possible though.)
   */
-  extern volatile int   MR_num_outstanding_contexts_and_sparks;
+  extern volatile int   MR_num_outstanding_contexts_and_global_sparks;
+  
+  /*
+  ** As above, except that sparks on local spark queues are also counted even
+  ** though they don't represent _parallel_ work.  Since local queues are
+  ** manipulated without locking this variable must be modified by atomic
+  ** instructions, even when done from within a critical section.
+  */
+  extern volatile int   MR_num_outstanding_contexts_and_all_sparks;
 #endif  /* !MR_LL_PARALLEL_CONJ */
 
 /*---------------------------------------------------------------------------*/
@@ -749,10 +758,18 @@ extern  void        MR_schedule_context(
 
   #define MR_fork_globally_criteria                                           \
     (MR_num_idle_engines != 0 &&                                              \
-    MR_num_outstanding_contexts_and_sparks < MR_max_outstanding_contexts)
+    MR_num_outstanding_contexts_and_global_sparks < MR_max_outstanding_contexts)
+
+  /*
+  ** These macros may be used as conditions for runtime parallelism decisions.
+  ** They return nonzero when parallelism is recommended (because there are
+  ** enough CPUs to assign work to).
+  */
+  #define MR_par_cond_contexts_and_global_sparks_vs_num_cpus(target_cpus)     \
+      (MR_num_outstanding_contexts_and_global_sparks < target_cpus)
 
-  #define MR_choose_parallel_over_sequential_cond(target_cpus)                \
-      (MR_num_outstanding_contexts_and_sparks < target_cpus)
+  #define MR_par_cond_contexts_and_all_sparks_vs_num_cpus(target_cpus)        \
+      (MR_num_outstanding_contexts_and_all_sparks < target_cpus)
 
   #define MR_schedule_spark_locally(spark)                                    \
     do {                                                                      \
@@ -764,6 +781,7 @@ extern  void        MR_schedule_context(
         */                                                                    \
         ssl_ctxt = MR_ENGINE(MR_eng_this_context);                            \
         MR_wsdeque_push_bottom(&ssl_ctxt->MR_ctxt_spark_deque, (spark));      \
+        MR_atomic_inc_int(&MR_num_outstanding_contexts_and_all_sparks);       \
     } while (0)
 
   #define MR_join_and_continue(sync_term, join_label)                         \
@@ -819,6 +837,7 @@ extern  void        MR_schedule_context(
         jnc_popped = MR_wsdeque_pop_bottom(&jnc_ctxt->MR_ctxt_spark_deque,    \
             &jnc_spark);                                                      \
         if (jnc_popped) {                                                     \
+            MR_atomic_dec_int(&MR_num_outstanding_contexts_and_all_sparks);   \
             MR_GOTO(jnc_spark.MR_spark_resume);                               \
         } else {                                                              \
             MR_runnext();                                                     \
@@ -842,6 +861,7 @@ extern  void        MR_schedule_context(
             ** the current context.                                           \
             */                                                                \
             MR_UNLOCK(&MR_sync_term_lock, "continue_2 i");                    \
+            MR_atmoic_dec_inc(MR_num_outstanding_contexts_and_all_sparks);   \
             MR_GOTO(jnc_spark.MR_spark_resume);                               \
         } else {                                                              \
             /*                                                                \
Index: runtime/mercury_wrapper.c
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_wrapper.c,v
retrieving revision 1.195
diff -u -p -b -r1.195 mercury_wrapper.c
--- runtime/mercury_wrapper.c	1 Jun 2009 09:28:01 -0000	1.195
+++ runtime/mercury_wrapper.c	9 Jun 2009 07:42:09 -0000
@@ -289,7 +289,7 @@ static  char        *MR_mem_usage_report
 
 static  int         MR_num_output_args = 0;
 
-unsigned int        MR_num_threads = 1;
+MR_Unsigned         MR_num_threads = 1;
 
 static  MR_bool     MR_print_table_statistics = MR_FALSE;
 
Index: runtime/mercury_wrapper.h
===================================================================
RCS file: /home/mercury1/repository/mercury/runtime/mercury_wrapper.h,v
retrieving revision 1.81
diff -u -p -b -r1.81 mercury_wrapper.h
--- runtime/mercury_wrapper.h	21 May 2008 02:48:59 -0000	1.81
+++ runtime/mercury_wrapper.h	9 Jun 2009 07:38:02 -0000
@@ -256,6 +256,8 @@ extern	MR_Unsigned	MR_contexts_per_threa
 */
 extern	MR_Unsigned	MR_max_outstanding_contexts;
 
+extern  MR_Unsigned MR_num_threads;
+
 /* file names for the mdb debugging streams */
 extern	const char	*MR_mdb_in_filename;
 extern	const char	*MR_mdb_out_filename;

-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 481 bytes
Desc: Digital signature
URL: <http://lists.mercurylang.org/archives/reviews/attachments/20090611/8f7d3513/attachment.sig>


More information about the reviews mailing list