[m-rev.] For post-commit review: Loop control transformation, source-to-source component

Paul Bone pbone at csse.unimelb.edu.au
Tue Sep 27 10:48:50 AEST 2011


For post-commit review by Zoltan.


Implement the source-to-source part of the loop control transformation.  The
remaining part is the code generation for code that is to be spawned off.  It
must be handled in the code generator since it uses the parent stack pointer in
many cases.

I'm committing this now so that Zoltan can begin to review it while I work on
the code generator component.

compiler/par_loop_control.m:
    This new file contains the source-to-source part of the parallel loop
    control transformation..

compiler/transform_hlds.m.
    Include the par_loop_control module within the transform_hlds module.

compiler/mercury_compile_middle_passes.m:
    Call the loop control transformation at stage 206 - after the dependant
    parallel conjunction transformation.

    Move the last call optimisation pass from stage 175 to 206 since it will
    most-likely prevent loop control from working.  Where both transformations
    are applicable, the loop control transformation is preferred.

compiler/options.m:
    Add new options for loop control.

compiler/handle_options.m:
    Disable loop control if we're not in a grade that supports parallel
    conjunctions.

    Other tests that should have been testing for parallel conjunction support
    but only tested parallel support have been fixed.

compiler/hlds_goal.m:
    Add the feature_do_not_tailcall feature.

compiler/call_gen.m:
    Mark LLCS call goals that may not have last call optimisation applied to
    them if they have the feature_do_not_tailcall feature set in their HLDS
    info.

compiler/goal_util.m:
    Create a new predicate expand_plain_conj, this returns a list of the sub
    goals of a plain conjunction, or returns the goal in a singleton list.
    XXX: Could someone review the name of this predicate.

compiler/hlds_pred.m:
    Add a symbol for the new transformation in the pred_transformation type.

    Corrected a comment to match the arguments in the predicate it refers to.

compiler/prog_util.m:
    Add support to make_pred_name for creating names for loop control
    predicates.

compiler/dep_par_conj.m:
    Fix grammer in a comment.

compiler/saved_vars.m:
    Conform to the change in hlds_goal.m

compiler/layout_out.m:
    Conform to the change in hlds_pred.m

runtime/mercury_par_builtin.[ch]:
    Add support for lc_wait_free_slot/2, the blocking version of
    lc_get_free_slot/2.  This means that other loop control builtins have
    changed, for instance, lc_join_and_terminate/2 must wake up a context
    blocked in lc_wait_free_slot/2 after making the slot it was using free.

    Use a spin lock in the loop control structure rather than a POSIX mutex.

runtime/mercury_wrapper.[ch]:
    Add support for a runtime variable, the number of contexts per loop control.
    This can be controlled with a MERCURY_OPTIONS option.

mdbcomp/program_representation.m:
    Include lc_wait_free_slot/2 in the list of external predicates.

mdbcomp/mdbcomp.goal_path.m:
    Add two new predicates goal_path_remove_first/3 and goal_path_get_first/2.

library/par_builtin.m:
    Add new builtins to support the loop control transformation:

        lc_wait_free_slot/2 will block the context until a new slot is
        available.

        lc_default_num_contexts/1 will return the number of contexts to use, by
        default, for a loop-controlled loop.

    Add myself as an author of this module.

doc/user_guide.texi:
    Document the runtime --num-contexts-per-lc-per-thread option.  It is
    currently commented out since it is not intended for users, at least for
    now.

    Document the loop control options for the compiler.

---

The change below was written by Zoltan, I reviewed when I applied his diff to
my workspace.

Allow the compiler to mark calls in the LLDS as calls that cannot have last
call optimization applied to them. Paul will soon need this capability
in order to implement parallel conjunctions in which earlier conjuncts
are spawned off, and later conjuncts contain recursive calls, but the
earlier conjuncts need the stack frame.

compiler/llds.m:
        Add a flag to det and semi calls. (Model_non calls have had a similar
        flag for a long time, for a totally different reason.)

compiler/call_gen.m:
        By default, say that det and semi calls may have LCO applied to them.

compiler/jumpopt.m:
        Apply LCO to det and semi calls only if this flag allows it.

compiler/opt_debug.m:
        Include the flag in debugging dumps.

diff --git a/compiler/call_gen.m b/compiler/call_gen.m
index d5de3aa..426faaa 100644
--- a/compiler/call_gen.m
+++ b/compiler/call_gen.m
@@ -99,7 +99,7 @@ generate_call(CodeModel, PredId, ProcId, ArgVars, GoalInfo, Code, !CI) :-
     kill_dead_input_vars(ArgsInfos, GoalInfo, NonLiveOutputs, !CI),
 
     % Figure out what the call model is.
-    prepare_for_call(CodeModel, CallModel, TraceResetCode, !CI),
+    prepare_for_call(CodeModel, GoalInfo, CallModel, TraceResetCode, !CI),
 
     % Make the call. Note that the construction of CallCode will be moved
     % *after* the code that computes ReturnLiveLvalues.
@@ -232,7 +232,7 @@ generate_main_generic_call(_OuterCodeModel, GenericCall, Args, Modes, Det,
     extra_livevals(FirstImmInput, ExtraLiveVals),
     set.insert_list(ExtraLiveVals, LiveVals0, LiveVals),
 
-    call_gen.prepare_for_call(CodeModel, CallModel, TraceCode, !CI),
+    prepare_for_call(CodeModel, GoalInfo, CallModel, TraceCode, !CI),
 
     % Make the call.
     get_next_label(ReturnLabel, !CI),
@@ -449,17 +449,24 @@ generic_call_nonvar_setup(cast(_), _, _, _, _, !CI) :-
 
 %---------------------------------------------------------------------------%
 
-:- pred prepare_for_call(code_model::in, call_model::out,
+:- pred prepare_for_call(code_model::in, hlds_goal_info::in, call_model::out,
     llds_code::out, code_info::in, code_info::out) is det.
 
-prepare_for_call(CodeModel, CallModel, TraceCode, !CI) :-
+prepare_for_call(CodeModel, GoalInfo, CallModel, TraceCode, !CI) :-
     succip_is_used(!CI),
     (
+        goal_info_has_feature(GoalInfo, feature_do_not_tailcall)
+    ->
+        AllowLCO = do_not_allow_lco
+    ;
+        AllowLCO = allow_lco
+    ),
+    (
         CodeModel = model_det,
-        CallModel = call_model_det
+        CallModel = call_model_det(AllowLCO)
     ;
         CodeModel = model_semi,
-        CallModel = call_model_semidet
+        CallModel = call_model_semidet(AllowLCO)
     ;
         CodeModel = model_non,
         may_use_nondet_tailcall(!.CI, TailCallStatus),
diff --git a/compiler/dep_par_conj.m b/compiler/dep_par_conj.m
index af2132f..2259bda 100644
--- a/compiler/dep_par_conj.m
+++ b/compiler/dep_par_conj.m
@@ -288,9 +288,9 @@ sync_dep_par_conjs_in_proc(PredId, ProcId, IgnoreVars, !ModuleInfo,
         globals.lookup_bool_option(Globals, allow_some_paths_only_waits,
             AllowSomePathsOnly),
 
-        % We rely on dependency information in order to determine which calls a
-        % recursive.  The information is stored within !ModuleInfo so doesn't
-        % need to be kept here, this call simply forces an update.
+        % We rely on dependency information in order to determine which calls
+        % are recursive.  The information is stored within !ModuleInfo so
+        % doesn't need to be kept here, this call simply forces an update.
         module_info_rebuild_dependency_info(!ModuleInfo, _),
 
         GoalBeforeDepParConj = !.Goal,
diff --git a/compiler/goal_util.m b/compiler/goal_util.m
index c4ee2e0..f912d88 100644
--- a/compiler/goal_util.m
+++ b/compiler/goal_util.m
@@ -278,6 +278,13 @@
 :- pred create_conj(hlds_goal::in, hlds_goal::in, conj_type::in,
     hlds_goal::out) is det.
 
+    % expand_plain_conj(Goal, Goals).
+    %
+    % If Goal represents a plain conjunction then Goals are its conjuncts,
+    % otherwise Goals is a singleton list containing Goal.
+    %
+:- pred expand_plain_conj(hlds_goal::in, list(hlds_goal)::out) is det.
+
     % can_reorder_goals_old(ModuleInfo, VarTypes, FullyStrict,
     %   InstmapBeforeGoal1, Goal1, InstmapBeforeGoal2, Goal2).
     %
@@ -1479,6 +1486,16 @@ create_conj_from_list(Conjuncts, ConjType, ConjGoal) :-
 
 %-----------------------------------------------------------------------------%
 
+expand_plain_conj(Goal, Conjs) :-
+    Goal = hlds_goal(GoalExpr, _),
+    ( GoalExpr = conj(plain_conj, ConjsPrime) ->
+        Conjs = ConjsPrime
+    ;
+        Conjs = [Goal]
+    ).
+
+%-----------------------------------------------------------------------------%
+
 can_reorder_goals_old(ModuleInfo, VarTypes, FullyStrict,
         InstmapBeforeEarlierGoal, EarlierGoal,
         InstmapBeforeLaterGoal, LaterGoal) :-
diff --git a/compiler/handle_options.m b/compiler/handle_options.m
index a58f9bf..a79de53 100644
--- a/compiler/handle_options.m
+++ b/compiler/handle_options.m
@@ -36,7 +36,7 @@
     % handle_given_options(Args, OptionArgs, NonOptionArgs, Link,
     %   Errors, Globals, !IO).
     %
-:- pred handle_given_options(list(string)::in, 
+:- pred handle_given_options(list(string)::in,
     list(string)::out, list(string)::out, bool::out, list(string)::out,
     globals::out, io::di, io::uo) is det.
 
@@ -485,7 +485,7 @@ check_option_values(!OptionTable, Target, GC_Method, TagsMethod,
             "\t(must be `posix', `cygwin', `msys' or `windows').",
             !Errors)
     ).
-    
+
 :- pred add_error(string::in, list(string)::in, list(string)::out) is det.
 
 add_error(Error, Errors0, Errors) :-
@@ -584,14 +584,15 @@ convert_options_to_globals(OptionTable0, Target, GC_Method, TagsMethod0,
     ;
         TagsMethod = TagsMethod0
     ),
-        
+
+    current_grade_supports_par_conj(!.Globals, GradeSupportsParConj),
     globals.lookup_bool_option(!.Globals, parallel, Parallel),
     globals.lookup_bool_option(!.Globals, threadscope, Threadscope),
     (
-        Parallel = no,
+        GradeSupportsParConj = no,
         Threadscope = yes
     ->
-        add_error("'threadscope' grade component requires a parallel grade", 
+        add_error("'threadscope' grade component requires a parallel grade",
             !Errors)
     ;
         true
@@ -605,20 +606,33 @@ convert_options_to_globals(OptionTable0, Target, GC_Method, TagsMethod0,
     (
         ImplicitParallelism = yes,
         (
-            Parallel = yes,
+            GradeSupportsParConj = yes,
             globals.lookup_string_option(!.Globals, feedback_file,
                 FeedbackFile),
             (
                 FeedbackFile = ""
             ->
                 add_error(
-                    "'--implicit-parallelism' requires '--feedback-file'", 
+                    "'--implicit-parallelism' requires '--feedback-file'",
                     !Errors)
             ;
                 true
             )
         ;
-            Parallel = no,
+            % Report an error when used in parallel grades without parallel
+            % conjunction support.  In non-parallel grades simply ignore
+            % --implicit-parallelism.
+            GradeSupportsParConj = no,
+            (
+                Parallel = yes,
+                add_error(
+                    "'--implicit-parallelism' requires a grade that " ++
+                    "supports parallel conjunctions, use a low-level C " ++
+                    "grade without trailing.",
+                    !Errors)
+            ;
+                Parallel = no
+            ),
             globals.set_option(implicit_parallelism, bool(no), !Globals)
         )
     ;
@@ -629,6 +643,14 @@ convert_options_to_globals(OptionTable0, Target, GC_Method, TagsMethod0,
     option_implies(implicit_parallelism, pre_implicit_parallelism_simplify,
         bool(yes), !Globals),
 
+    % Loop control is not applicable in non-parallel grades.
+    (
+        GradeSupportsParConj = yes
+    ;
+        GradeSupportsParConj = no,
+        globals.set_option(par_loop_control, bool(no), !Globals)
+    ),
+
     % Generating IL implies:
     %   - gc_method `automatic' and no heap reclamation on failure
     %     Because GC is handled automatically by the .NET CLR
@@ -673,7 +695,7 @@ convert_options_to_globals(OptionTable0, Target, GC_Method, TagsMethod0,
     %     Because we use 32-bit integers which may be different to that of
     %     the host compiler.
 
-    ( 
+    (
         Target = target_il,
         globals.set_gc_method(gc_automatic, !Globals),
         globals.set_option(gc, string("automatic"), !Globals),
@@ -754,7 +776,7 @@ convert_options_to_globals(OptionTable0, Target, GC_Method, TagsMethod0,
     %   - store nondet environments on the heap
     %         Because Java has no way of allocating structs on the stack.
     %   - pretest-equality-cast-pointers
-    %   - no library grade installation check with `mmc --make'. 
+    %   - no library grade installation check with `mmc --make'.
     %   - cross compiling
     %     Because ints in Java are 32-bits wide which may be different to
     %     that of the host compiler.
@@ -816,7 +838,7 @@ convert_options_to_globals(OptionTable0, Target, GC_Method, TagsMethod0,
     %     Because Erlang has arbitrary precision integers which may
     %     different to that of the host compiler.
 
-    ( 
+    (
         Target = target_erlang,
         globals.set_gc_method(gc_automatic, !Globals),
         globals.set_option(gc, string("automatic"), !Globals),
@@ -847,7 +869,7 @@ convert_options_to_globals(OptionTable0, Target, GC_Method, TagsMethod0,
 
     % Generating assembler via the gcc back-end requires
     % using high-level code.
-    ( 
+    (
         Target = target_asm,
         globals.set_option(highlevel_code, bool(yes), !Globals),
         globals.set_option(highlevel_data, bool(no), !Globals)
@@ -953,11 +975,11 @@ convert_options_to_globals(OptionTable0, Target, GC_Method, TagsMethod0,
     % and they don't need to be recreated when compiling to C.
     option_implies(invoked_by_mmc_make,
         generate_mmc_make_module_dependencies, bool(no), !Globals),
-    
+
     % --libgrade-install-check only works with --make
-    option_neg_implies(make, libgrade_install_check, bool(no), !Globals), 
+    option_neg_implies(make, libgrade_install_check, bool(no), !Globals),
 
-    % `--transitive-intermodule-optimization' and `--make' are 
+    % `--transitive-intermodule-optimization' and `--make' are
     % not compatible with each other.
     %
     globals.lookup_bool_option(!.Globals, transitive_optimization,
@@ -1330,11 +1352,11 @@ convert_options_to_globals(OptionTable0, Target, GC_Method, TagsMethod0,
         globals.set_option(coverage_profiling, bool(yes), !Globals),
         globals.set_option(coverage_profiling_static, bool(no), !Globals),
         globals.set_option(profile_deep_coverage_after_goal, bool(yes),
-            !Globals), 
+            !Globals),
         globals.set_option(profile_deep_coverage_branch_ite, bool(yes),
-            !Globals), 
+            !Globals),
         globals.set_option(profile_deep_coverage_branch_switch, bool(yes),
-            !Globals), 
+            !Globals),
         globals.set_option(profile_deep_coverage_branch_disj, bool(yes),
             !Globals),
 
@@ -1343,7 +1365,7 @@ convert_options_to_globals(OptionTable0, Target, GC_Method, TagsMethod0,
         % coverage information propagation easier at the expense of
         % inserting more coverage points.
         globals.set_option(profile_deep_coverage_use_portcounts, bool(no),
-            !Globals), 
+            !Globals),
         globals.set_option(profile_deep_coverage_use_trivial, bool(no),
             !Globals)
     ;
@@ -1361,7 +1383,7 @@ convert_options_to_globals(OptionTable0, Target, GC_Method, TagsMethod0,
     % want so we disable inlining with deep profiling by default.  The
     % user can re-enable it with the `--profile-optimized' option.  Leave
     % inlineing enabled when profiling for implicit parallelism.
-    % 
+    %
     globals.lookup_bool_option(!.Globals, prof_optimized, ProfOptimized),
     (
         ProfOptimized = no,
@@ -1456,7 +1478,7 @@ convert_options_to_globals(OptionTable0, Target, GC_Method, TagsMethod0,
             % For the IL backend we turn off optimize_peep
             % so that we don't optimize away references to the
             % local variables of a procedure.
-            ( 
+            (
                 Target = target_il,
                 globals.set_option(optimize_peep, bool(no), !Globals)
             ;
@@ -1971,12 +1993,12 @@ convert_options_to_globals(OptionTable0, Target, GC_Method, TagsMethod0,
             intermod_directories, IntermodDirs0),
         globals.set_option(intermod_directories,
             accumulating(ExtraIntermodDirs ++ IntermodDirs0), !Globals),
-        
+
         ExtraInitDirs = list.map(
             (func(MercuryLibDir) =
                 MercuryLibDir / "modules" / GradeString
             ), MercuryLibDirs),
-        
+
         globals.lookup_accumulating_option(!.Globals,
             init_file_directories, InitDirs1),
         globals.set_option(init_file_directories,
@@ -2292,7 +2314,7 @@ postprocess_options_lowlevel(!Globals) :-
     ),
     globals.set_option(static_code_addresses, bool(StaticCodeAddrs), !Globals).
 
-    
+
     % option_implies(SourceBoolOption, ImpliedOption, ImpliedOptionValue):
     % If the SourceBoolOption is set to yes, then the ImpliedOption is set
     % to ImpliedOptionValue.
@@ -2427,7 +2449,7 @@ long_usage(!IO) :-
     % copies of the long usage message. We can print both a short and along
     % usage message, but there is no simple way to avoid that.
     library.version(Version),
-    io.write_strings(["Name: mmc -- Melbourne Mercury Compiler, version ", 
+    io.write_strings(["Name: mmc -- Melbourne Mercury Compiler, version ",
         Version, "\n"], !IO),
     io.write_string("Copyright: Copyright (C) 1993-2011 " ++
         "The University of Melbourne\n", !IO),
@@ -2499,7 +2521,7 @@ string_to_grade_component(FilterDesc, Comp, !Comps, !Errors) :-
     ).
 
     % filter_grade(FilterPred, Components, GradeString, !Grades, !Errors):
-    % 
+    %
     % Convert `GradeString' into a list of grade component strings, and
     % then check whether the given grade should be filtered from the
     % library grade set by applying the closure `FilterPred(Components)',
@@ -2553,7 +2575,7 @@ must_not_contain(OmitComponents, GradeComponents) :-
     is det.
 
 grade_string_to_comp_strings(GradeString, MaybeGrade, !Errors) :-
-    ( 
+    (
         split_grade_string(GradeString, ComponentStrs),
         StrToComp = (pred(Str::in, Str::out) is semidet :-
             grade_component_table(Str, _, _, _, _)
@@ -2868,7 +2890,7 @@ grade_component_table("erlang", comp_gcc_ext, [
 grade_component_table("par", comp_par, [parallel - bool(yes)], no, yes).
 
     % Threadscope profiling in parallel grades.
-grade_component_table("threadscope", comp_par_threadscope, 
+grade_component_table("threadscope", comp_par_threadscope,
     [threadscope - bool(yes)], no, yes).
 
     % GC components.
@@ -2941,7 +2963,7 @@ grade_component_table("dmmos", comp_minimal_model,
     use_minimal_model_own_stacks - bool(yes),
     minimal_model_debug - bool(yes)], no, yes).
 
-grade_component_table("spf", comp_single_prec_float, 
+grade_component_table("spf", comp_single_prec_float,
     [single_prec_float - bool(yes),
     unboxed_float - bool(yes)], no, yes).
 
@@ -2967,20 +2989,20 @@ grade_component_table("exts", comp_stack_extend,
 grade_component_table("stseg", comp_stack_extend,
     [extend_stacks_when_needed - bool(no), stack_segments - bool(yes)],
     no, yes).
-    
+
     % Region-based memory managment components
 grade_component_table("rbmm", comp_regions,
     [use_regions - bool(yes),
     use_regions_debug - bool(no), use_regions_profiling - bool(no)],
-    no, yes).  
+    no, yes).
 grade_component_table("rbmmd", comp_regions,
     [use_regions - bool(yes),
     use_regions_debug - bool(yes), use_regions_profiling - bool(no)],
-    no, yes).  
+    no, yes).
 grade_component_table("rbmmp", comp_regions,
     [use_regions - bool(yes),
     use_regions_debug - bool(no), use_regions_profiling - bool(yes)],
-    no, yes).  
+    no, yes).
 grade_component_table("rbmmdp", comp_regions,
     [use_regions - bool(yes),
     use_regions_debug - bool(yes), use_regions_profiling - bool(yes)],
diff --git a/compiler/hlds_goal.m b/compiler/hlds_goal.m
index 57c97ac..e0f3ab2 100644
--- a/compiler/hlds_goal.m
+++ b/compiler/hlds_goal.m
@@ -1560,10 +1560,16 @@
             % transformation but should be removed by the end of mode
             % checking.
 
-    ;       feature_contains_stm_inner_outer.
+    ;       feature_contains_stm_inner_outer
             % This goal is a goal inside an atomic scope, for which the calls
             % to convert inner and outer variables have been inserted.
 
+    ;       feature_do_not_tailcall.
+            % This goal is a call that should not be executed as a tail call.
+            % Currently this is only used by the loop control optimization
+            % since a spawned off task may need to use the parent's stack frame
+            % even after the parent makes a tail call.
+
 %-----------------------------------------------------------------------------%
 %
 % The rename_var* predicates take a structure and a mapping from var -> var
diff --git a/compiler/hlds_pred.m b/compiler/hlds_pred.m
index 3eb12c0..02ced6b 100644
--- a/compiler/hlds_pred.m
+++ b/compiler/hlds_pred.m
@@ -487,6 +487,7 @@
                 int % The procedure number of the original procedure.
             )
     ;       transform_dependent_parallel_conjunction
+    ;       transform_parallel_loop_control
     ;       transform_return_via_ptr(
                 proc_id,
                     % The id of the procedure this predicate is derived from.
@@ -557,8 +558,8 @@
     pred_info::out) is det.
 
     % pred_info_create(ModuleName, SymName, PredOrFunc, Context, Origin,
-    %   Status, Markers, TypeVarSet, ExistQVars, ArgTypes,
-    %   ClassContext, Assertions, User, VarNameRemap, ProcInfo, ProcId,
+    %   Status, Markers, ArgTypes, TypeVarSet, ExistQVars,
+    %   ClassContext, Assertions, VarNameRemap, ProcInfo, ProcId,
     %   PredInfo)
     %
     % Return a pred_info whose fields are filled in from the information
diff --git a/compiler/jumpopt.m b/compiler/jumpopt.m
index 81acfab..3275218 100644
--- a/compiler/jumpopt.m
+++ b/compiler/jumpopt.m
@@ -490,8 +490,8 @@ jump_opt_llcall(Uinstr0, Comment0, Instrs0, PrevInstr, JumpOptInfo,
             % Look for det style tailcalls. We look for this even if
             % the call is semidet, because one of the optimizations below
             % turns a pair of semidet epilogs into a det epilog.
-            ( CallModel = call_model_det
-            ; CallModel = call_model_semidet
+            ( CallModel = call_model_det(allow_lco)
+            ; CallModel = call_model_semidet(allow_lco)
             ),
             ProcMap = JumpOptInfo ^ joi_proc_map,
             map.search(ProcMap, RetLabel, Between0),
@@ -504,7 +504,7 @@ jump_opt_llcall(Uinstr0, Comment0, Instrs0, PrevInstr, JumpOptInfo,
             NewRemain = specified(NewInstrs, Instrs0)
         ;
             % Look for semidet style tailcalls.
-            CallModel = call_model_semidet,
+            CallModel = call_model_semidet(allow_lco),
             ForkMap = JumpOptInfo ^ joi_fork_map,
             map.search(ForkMap, RetLabel, Between),
             PrevInstr = livevals(Livevals)
diff --git a/compiler/layout_out.m b/compiler/layout_out.m
index f9b24ad..f33ab0d 100644
--- a/compiler/layout_out.m
+++ b/compiler/layout_out.m
@@ -2564,6 +2564,7 @@ pred_transform_name(transform_tuple(Proc)) = "tup_" ++ int_to_string(Proc).
 pred_transform_name(transform_untuple(Proc)) = "untup_" ++ int_to_string(Proc).
 pred_transform_name(transform_dependent_parallel_conjunction) =
     "dep_par_conj_".
+pred_transform_name(transform_parallel_loop_control) = "par_lc".
 pred_transform_name(transform_return_via_ptr(ProcId, ArgPos)) =
     "retptr_" ++ int_to_string(proc_id_to_int(ProcId)) ++ "_args"
         ++ ints_to_string(ArgPos).
diff --git a/compiler/llds.m b/compiler/llds.m
index ecdccfb..7aad16f 100644
--- a/compiler/llds.m
+++ b/compiler/llds.m
@@ -273,9 +273,13 @@
             % Under these conditions, the call can be transformed into a tail
             % call whenever its return address leads to the procedure epilogue.
 
+:- type allow_lco
+    --->    do_not_allow_lco
+    ;       allow_lco.
+
 :- type call_model
-    --->    call_model_det
-    ;       call_model_semidet
+    --->    call_model_det(allow_lco)
+    ;       call_model_semidet(allow_lco)
     ;       call_model_nondet(nondet_tail_call).
 
     % The type defines the various LLDS virtual machine instructions.
@@ -311,9 +315,13 @@
             % on return. The fourth argument gives the context of the call.
             % The fifth gives the goal id of the call in the body of the
             % procedure; it is meaningful only if execution tracing is enabled.
-            % The last gives the code model of the called procedure, and if
-            % it is model_non, says whether tail recursion elimination is
-            % potentially applicable to the call.
+            % The last gives the code model of the called procedure, and says
+            % whether tail recursion elimination may be applied to the call.
+            % For model_non calls, this depends on whether there are any other
+            % stack frames on top of the stack frame of this procedure on the
+            % nondet stack. For model_det and model_semi calls, this depends on
+            % whether there is some other code, executing in parallel with this
+            % context, that uses the current stack frame.
             %
             % The ll prefix on call is to avoid the use of the call keyword
             % and to distinguish this function symbol from a similar one
diff --git a/compiler/mercury_compile_middle_passes.m b/compiler/mercury_compile_middle_passes.m
index 6908bdb..3062904 100644
--- a/compiler/mercury_compile_middle_passes.m
+++ b/compiler/mercury_compile_middle_passes.m
@@ -79,6 +79,7 @@
 :- import_module transform_hlds.lco.
 :- import_module transform_hlds.loop_inv.
 :- import_module transform_hlds.mmc_analysis.
+:- import_module transform_hlds.par_loop_control.
 :- import_module transform_hlds.parallel_to_plain_conj.
 :- import_module transform_hlds.rbmm.
 :- import_module transform_hlds.size_prof.
@@ -221,9 +222,6 @@ middle_pass(ModuleName, !HLDS, !DumpInfo, !IO) :-
     maybe_implicit_parallelism(Verbose, Stats, !HLDS, !IO),
     maybe_dump_hlds(!.HLDS, 173, "implicit_parallelism", !DumpInfo, !IO),
 
-    maybe_lco(Verbose, Stats, !HLDS, !IO),
-    maybe_dump_hlds(!.HLDS, 175, "lco", !DumpInfo, !IO),
-
     maybe_analyse_mm_tabling(Verbose, Stats, !HLDS, !IO),
     maybe_dump_hlds(!.HLDS, 185, "mm_tabling_analysis", !DumpInfo, !IO),
 
@@ -236,6 +234,12 @@ middle_pass(ModuleName, !HLDS, !DumpInfo, !IO) :-
     maybe_impl_dependent_par_conjs(Verbose, Stats, !HLDS, !IO),
     maybe_dump_hlds(!.HLDS, 205, "dependent_par_conj", !DumpInfo, !IO),
 
+    maybe_par_loop_control(Verbose, Stats, !HLDS, !IO),
+    maybe_dump_hlds(!.HLDS, 206, "par_loop_control", !DumpInfo, !IO),
+
+    maybe_lco(Verbose, Stats, !HLDS, !IO),
+    maybe_dump_hlds(!.HLDS, 210, "lco", !DumpInfo, !IO),
+
     % If we are compiling in a deep profiling grade then now rerun simplify.
     % The reason for doing this now is that we want to take advantage of any
     % opportunities the other optimizations have provided for constant
@@ -1305,6 +1309,26 @@ maybe_impl_dependent_par_conjs(Verbose, Stats, !HLDS, !IO) :-
 
 %-----------------------------------------------------------------------------%
 
+:- pred maybe_par_loop_control(bool::in, bool::in,
+    module_info::in, module_info::out, io::di, io::uo) is det.
+
+maybe_par_loop_control(Verbose, Stats, !HLDS, !IO) :-
+    module_info_get_globals(!.HLDS, Globals),
+    globals.lookup_bool_option(Globals, par_loop_control, LoopControl),
+    (
+        LoopControl = yes,
+        maybe_write_string(Verbose,
+            "% Applying parallel loop control transformation...\n", !IO),
+        maybe_flush_output(Verbose, !IO),
+        maybe_par_loop_control_module(!HLDS),
+        maybe_write_string(Verbose, "% done.\n", !IO),
+        maybe_report_stats(Stats, !IO)
+    ;
+        LoopControl = no
+    ).
+
+%-----------------------------------------------------------------------------%
+
 :- pred maybe_term_size_prof(bool::in, bool::in,
     module_info::in, module_info::out, io::di, io::uo) is det.
 
diff --git a/compiler/opt_debug.m b/compiler/opt_debug.m
index 4b6cab0..e1b9525 100644
--- a/compiler/opt_debug.m
+++ b/compiler/opt_debug.m
@@ -829,12 +829,18 @@ dump_instr(MaybeProcLabel, AutoComments, Instr) = Str :-
         Instr = llcall(Callee, ReturnLabel, _LiveInfo, _Context, _GoalPath,
             CallModel),
         (
-            CallModel = call_model_det,
+            CallModel = call_model_det(allow_lco),
             CallModelStr = "det"
         ;
-            CallModel = call_model_semidet,
+            CallModel = call_model_det(do_not_allow_lco),
+            CallModelStr = "det_no_lco"
+        ;
+            CallModel = call_model_semidet(allow_lco),
             CallModelStr = "semidet"
         ;
+            CallModel = call_model_semidet(do_not_allow_lco),
+            CallModelStr = "semidet_no_lco"
+        ;
             CallModel = call_model_nondet(no_tail_call),
             CallModelStr = "nondet no_tail_call"
         ;
diff --git a/compiler/options.m b/compiler/options.m
index 1b0bd63..e0d071d 100644
--- a/compiler/options.m
+++ b/compiler/options.m
@@ -1011,7 +1011,9 @@
     ;       control_granularity
     ;       distance_granularity
     ;       implicit_parallelism
-    ;       feedback_file.
+    ;       feedback_file
+    ;       par_loop_control
+    ;       par_loop_control_preserve_tail_recursion.
 
 %----------------------------------------------------------------------------%
 %----------------------------------------------------------------------------%
@@ -1878,7 +1880,9 @@ option_defaults_2(miscellaneous_option, [
     control_granularity                 -   bool(no),
     distance_granularity                -   int(0),
     implicit_parallelism                -   bool(no),
-    feedback_file                       -   string("")
+    feedback_file                       -   string(""),
+    par_loop_control                    -   bool(no),
+    par_loop_control_preserve_tail_recursion - bool(yes)
 ]).
 
     % please keep this in alphabetic order
@@ -2846,6 +2850,9 @@ long_option("control-granularity",  control_granularity).
 long_option("distance-granularity", distance_granularity).
 long_option("implicit-parallelism", implicit_parallelism).
 long_option("feedback-file",        feedback_file).
+long_option("par-loop-control",     par_loop_control).
+long_option("no-par-loop-control-preserve-tail-recursion",
+                                    par_loop_control_preserve_tail_recursion).
 
 %-----------------------------------------------------------------------------%
 
diff --git a/compiler/par_loop_control.m b/compiler/par_loop_control.m
new file mode 100644
index 0000000..8d93f75
--- /dev/null
+++ b/compiler/par_loop_control.m
@@ -0,0 +1,1167 @@
+%-----------------------------------------------------------------------------%
+% vim: ft=mercury ts=4 sw=4 et
+%-----------------------------------------------------------------------------%
+% Copyright (C) 2011 The University of Melbourne.
+% This file may only be copied under the terms of the GNU General
+% Public License - see the file COPYING in the Mercury distribution.
+%-----------------------------------------------------------------------------%
+%
+% File: par_loop_control.m.
+% Author: pbone.
+%
+% This module implements the parallel loop control transformation.  Parallel
+% conjunctions spawn off their second operand, execute the first and then block
+% waiting for the completion of the second.  Therefore, when the second operand
+% contains a recursive call the blocked resource consumes memory that orght to
+% be avoided.
+%
+% This can be avoided by spawning off the first operand and continuing with the
+% second.  This is acheived by the loop control transformation which provides
+% further optimization by using a different structure for synchronization.
+% There is one barrier in a loop rather than N (for N recursions) and the
+% maximum number of contexts the loop may used is fixed.
+%
+% Consider this loop:
+%
+%   map(M, [], []).
+%   map(M, [X | Xs], [Y | Ys]) :-
+%       (
+%           M(X, Y)
+%       &
+%           map(M, Xs, Ys)
+%       ).
+%
+% It would be transformed to:
+%
+%   map(M, Xs, Ys) :-
+%       create_loop_control(LC, P), % P is the number of contexts to use.
+%       map_lc(LC, M, Xs, Ys).
+%
+%   map(LC, _, [], []) :-
+%       finish_loop_control(LC).
+%   map(LC, M, [X | Xs], [Y | Ys) :-
+%       wait_free_slot(LC, LCS) ->
+%       spawn_off(LCS, (
+%           M(X, Y),
+%           join_and_terminate(LC, LCS)
+%       ),
+%       map(LC, M, Xs, Ys). % May not use tail recursion.
+%
+% The parallel conjunction is replaced with a wait_free_slot and spawn_off goals
+% for each conjunct except for the last, which is re-written to call the loop
+% control version of the predicate.
+%
+% Rules:
+%
+% 1. This transformation works when there are multiple parallel conjunctions in
+%    different branches.  It also works when the parallel conjunction has more
+%    than two conjuncts, in which case all but the right most branch are
+%    replaced with the call to spawn_off.
+%
+% 2. There may be code _after_ the recursive call that consumes variables
+%    produced in the first conjunct.  This is safe because the barrier in the
+%    base case has been executed.  Any consumption before the recursive call
+%    will already be using a future and is safe.
+%
+% 3. There _may not_ be more than one recursive call along any code-path.  That
+%    is to say, the code must be singly recursive so that the base case (and
+%    the barrier within) is executed exactly once.
+%
+% 4. Multiple parallel conjunctions may exist within the body, but due to rule
+%    3, only one of them may contain a recursive call.
+%
+%----------------------------------------------------------------------------%
+
+:- module transform_hlds.par_loop_control.
+:- interface.
+
+:- import_module hlds.hlds_module.
+
+%----------------------------------------------------------------------------%
+
+:- pred maybe_par_loop_control_module(module_info::in, module_info::out)
+    is det.
+
+%----------------------------------------------------------------------------%
+%----------------------------------------------------------------------------%
+
+:- implementation.
+
+:- import_module hlds.goal_path.
+:- import_module hlds.goal_util.
+:- import_module hlds.hlds_goal.
+:- import_module hlds.hlds_pred.
+:- import_module hlds.hlds_rtti.
+:- import_module hlds.instmap.
+:- import_module hlds.passes_aux.
+:- import_module hlds.pred_table.
+:- import_module libs.globals.
+:- import_module libs.options.
+:- import_module mdbcomp.goal_path.
+:- import_module mdbcomp.prim_data.
+:- import_module parse_tree.prog_data.
+:- import_module parse_tree.prog_util.
+:- import_module parse_tree.set_of_var.
+:- import_module transform_hlds.dependency_graph.
+
+:- import_module bool.
+:- import_module digraph.
+:- import_module list.
+:- import_module map.
+:- import_module maybe.
+:- import_module pair.
+:- import_module require.
+:- import_module set.
+:- import_module string.
+:- import_module varset.
+
+%----------------------------------------------------------------------------%
+
+maybe_par_loop_control_module(!ModuleInfo) :-
+    module_info_rebuild_dependency_info(!ModuleInfo, DepInfo),
+    process_all_nonimported_procs(
+        update_module(maybe_par_loop_control_proc(DepInfo)),
+        !ModuleInfo).
+
+:- pred maybe_par_loop_control_proc(dependency_info::in, pred_proc_id::in,
+    proc_info::in, proc_info::out, module_info::in, module_info::out) is det.
+
+maybe_par_loop_control_proc(DepInfo, PredProcId, !ProcInfo, !ModuleInfo) :-
+    ( loop_control_is_applicable(DepInfo, PredProcId, !.ProcInfo) ->
+        proc_info_get_goal(!.ProcInfo, Body0),
+
+        % Re-calculate goal ids.
+        proc_info_get_vartypes(!.ProcInfo, VarTypes),
+        fill_goal_id_slots_in_proc_body(!.ModuleInfo, VarTypes,
+            ContainingGoalMap, Body0, Body),
+        proc_info_set_goal(Body, !ProcInfo),
+        goal_get_loop_control_par_conjs(Body, PredProcId,
+            RecursiveParConjIds),
+        (
+            ( RecursiveParConjIds = have_not_seen_recursive_call
+            ; RecursiveParConjIds = seen_one_recursive_call_on_every_branch
+            ; RecursiveParConjIds = seen_unusable_recursion
+            )
+        ;
+            RecursiveParConjIds = seen_usable_recursion_in_par_conj(GoalIds),
+
+            % Go ahead and perform the transformation.
+            create_inner_proc(GoalIds, PredProcId, !.ProcInfo,
+                ContainingGoalMap,  InnerPredProcId, InnerPredName,
+                !ModuleInfo),
+            update_outer_proc(PredProcId, InnerPredProcId, InnerPredName,
+                !.ModuleInfo, !ProcInfo)
+        )
+    ;
+        true
+    ).
+
+%----------------------------------------------------------------------------%
+
+    % Loop control is applicable if the procedure contains a parallel
+    % conjunction with exactly two conjuncts whose right conjunct contains a
+    % recursive call.
+    %
+:- pred loop_control_is_applicable(dependency_info::in, pred_proc_id::in,
+    proc_info::in) is semidet.
+
+loop_control_is_applicable(DepInfo, PredProcId, ProcInfo) :-
+    proc_info_get_has_parallel_conj(ProcInfo, yes),
+    proc_info_get_inferred_determinism(ProcInfo, Detism),
+    % If the predicate itself is not deterministic then its recursive call
+    % will not be deterministic and therefore will not be found in a parallel
+    % conjunction.
+    ( Detism = detism_det
+    ; Detism = detism_cc_multi
+    ),
+    proc_is_self_recursive(DepInfo, PredProcId).
+
+:- pred proc_is_self_recursive(dependency_info::in, pred_proc_id::in)
+    is semidet.
+
+proc_is_self_recursive(DepInfo, PredProcId) :-
+    hlds_dependency_info_get_dependency_graph(DepInfo, DepGraph),
+
+    % There must be a directly recursive call.
+    digraph.lookup_key(DepGraph, PredProcId, SelfKey),
+    digraph.is_edge(DepGraph, SelfKey, SelfKey),
+
+    % There must not be a indirectly recursive call.
+    % Note: we could handle this in the future by inlining one call within
+    % another, but recursion analysis in the deep profiler should support this
+    % first.
+    digraph.delete_edge(SelfKey, SelfKey, DepGraph, DepGraphWOSelfEdge),
+    digraph.tc(DepGraphWOSelfEdge, TCDepGraphWOSelfEdge),
+    not digraph.is_edge(TCDepGraphWOSelfEdge, SelfKey, SelfKey).
+
+%----------------------------------------------------------------------------%
+
+:- type seen_usable_recursion
+    --->    have_not_seen_recursive_call
+                % There is no reachable recursive call in this goal.
+
+    ;       seen_one_recursive_call_on_every_branch
+                % There is exactly one recursive call on every reachable
+                % branch, Therefore this single recursion can be used if it is
+                % within a parallel conjunction.
+
+    ;       seen_unusable_recursion
+                % There is recursion but we cannot use it.  This is caused
+                % by a number of different reasons, some are:
+                %   + Multiple recursion.
+                %   + Recursion on some but not all branches or in code that is
+                %     not det/cc_multi.
+                %   + Usable recursion inside a parallel conjunction that is
+                %     inside _another_ parallel conjunction.
+
+    ;       seen_usable_recursion_in_par_conj(list(goal_id)).
+                % There is recursion within the right-most conjunct of a
+                % parallel conjunction.  There may be multiple cases of this
+                % (different parallel conjunctions in different branches).
+
+    % This subtype of seen usable recursion is the set of values for which we
+    % should keep searching.
+    %
+:- inst seen_usable_recursion_continue
+    --->    have_not_seen_recursive_call
+    ;       seen_one_recursive_call_on_every_branch
+    ;       seen_usable_recursion_in_par_conj(ground).
+
+:- pred goal_get_loop_control_par_conjs(hlds_goal::in, pred_proc_id::in,
+    seen_usable_recursion::out) is det.
+
+goal_get_loop_control_par_conjs(Goal, SelfPredProcId, SeenUsableRecursion) :-
+    Goal = hlds_goal(GoalExpr, GoalInfo),
+    Detism = goal_info_get_determinism(GoalInfo),
+    InstmapDelta = goal_info_get_instmap_delta(GoalInfo),
+    ( instmap_delta_is_reachable(InstmapDelta) ->
+        (
+            GoalExpr = unify(_, _, _, _, _),
+            SeenUsableRecursion0 = have_not_seen_recursive_call
+        ;
+            GoalExpr = plain_call(PredId, ProcId, _, _, _, _),
+            ( SelfPredProcId = proc(PredId, ProcId) ->
+                SeenUsableRecursion0 =
+                    seen_one_recursive_call_on_every_branch
+            ;
+                SeenUsableRecursion0 = have_not_seen_recursive_call
+            )
+        ;
+            GoalExpr = generic_call(_, _, _, _),
+            % We cannot determine if a generic call is recursive or not,
+            % however it most likely is not.  In either case we cannot perform
+            % the loop control transformation.
+            SeenUsableRecursion0 = have_not_seen_recursive_call
+        ;
+            GoalExpr = call_foreign_proc(_, _, _, _, _, _, _),
+            SeenUsableRecursion0 = have_not_seen_recursive_call
+        ;
+            GoalExpr = conj(ConjType, Conjs),
+            (
+                ConjType = plain_conj,
+                conj_get_loop_control_par_conjs(Conjs, SelfPredProcId,
+                    have_not_seen_recursive_call, SeenUsableRecursion0)
+            ;
+                ConjType = parallel_conj,
+                GoalId = goal_info_get_goal_id(GoalInfo),
+                par_conj_get_loop_control_par_conjs(Conjs, SelfPredProcId,
+                    GoalId, SeenUsableRecursion0)
+            )
+        ;
+            GoalExpr = disj(Disjs),
+            % If the disjunction contains a recursive call at all then the
+            % recursive call is in an unusable context.
+            (
+                member(Disj, Disjs),
+                goal_calls(Disj, SelfPredProcId)
+            ->
+                SeenUsableRecursion0 = seen_unusable_recursion
+            ;
+                SeenUsableRecursion0 = have_not_seen_recursive_call
+            )
+        ;
+            GoalExpr = switch(_, _CanFail, Cases),
+            map(case_get_loop_control_par_conjs(SelfPredProcId), Cases,
+                SeenUsableRecursionCases),
+            % If the switch can fail then there is effectively another branch
+            % that has no recursive call.  However, we don not need to test for
+            % it here as checking the determinism of the goal will detect such
+            % a case.
+            merge_loop_control_par_conjs_between_branches_list(
+                SeenUsableRecursionCases, SeenUsableRecursion0)
+        ;
+            GoalExpr = negation(SubGoal),
+            goal_get_loop_control_par_conjs(SubGoal, SelfPredProcId,
+                SeenUsableRecursion0)
+            % If the negation can fail (I don't see how it could possibly be
+            % 'det'); then code that checks the determinism below will ensure
+            % that any recursion found here is unusable (Like for can-fail
+            % switches).
+        ;
+            GoalExpr = scope(_, SubGoal),
+            goal_get_loop_control_par_conjs(SubGoal, SelfPredProcId,
+                SeenUsableRecursion0)
+        ;
+            GoalExpr = if_then_else(_, Cond, Then, Else),
+            goal_get_loop_control_par_conjs(Cond, SelfPredProcId,
+                SeenUsableRecursionCond),
+            (
+                SeenUsableRecursionCond = have_not_seen_recursive_call,
+                goal_get_loop_control_par_conjs(Then, SelfPredProcId,
+                    SeenUsableRecursionThen),
+                goal_get_loop_control_par_conjs(Else, SelfPredProcId,
+                    SeenUsableRecursionElse),
+                merge_loop_control_par_conjs_between_branches(
+                    SeenUsableRecursionThen, SeenUsableRecursionElse,
+                    SeenUsableRecursion0)
+            ;
+                % We can't make use of any recursion found in the condition of
+                % an if-then-else.
+                ( SeenUsableRecursionCond =
+                        seen_one_recursive_call_on_every_branch
+                ; SeenUsableRecursionCond = seen_unusable_recursion
+                ; SeenUsableRecursionCond = seen_usable_recursion_in_par_conj(_)
+                ),
+                SeenUsableRecursion0 = seen_unusable_recursion
+            )
+        ;
+            GoalExpr = shorthand(_),
+            unexpected($module, $pred, "shorthand")
+        ),
+
+        % If the goal might fail or might succeed more than once then the
+        % recursion is unusable for loop control.
+        (
+            ( SeenUsableRecursion0 = have_not_seen_recursive_call
+            ; SeenUsableRecursion0 = seen_unusable_recursion
+            ),
+            SeenUsableRecursion = SeenUsableRecursion0
+        ;
+            ( SeenUsableRecursion0 = seen_one_recursive_call_on_every_branch
+            ; SeenUsableRecursion0 = seen_usable_recursion_in_par_conj(_)
+            ),
+            (
+                ( Detism = detism_det
+                ; Detism = detism_cc_multi
+                ),
+                SeenUsableRecursion0 = SeenUsableRecursion
+            ;
+                ( Detism = detism_semi
+                ; Detism = detism_multi
+                ; Detism = detism_non
+                ; Detism = detism_cc_non
+                ; Detism = detism_erroneous
+                ; Detism = detism_failure
+                ),
+                SeenUsableRecursion = seen_unusable_recursion
+            )
+        )
+    ;
+        % InstmapDelta is unreachable.
+        SeenUsableRecursion = have_not_seen_recursive_call
+    ).
+
+    % Analyze the parallel conjunction for a usable recursive call.
+    %
+    % If any but the last conjunct contain a recursive call then that call is
+    % unusable.  If only the last conjunct contains a recursive call then it is
+    % usable.
+    %
+:- pred par_conj_get_loop_control_par_conjs(list(hlds_goal)::in,
+    pred_proc_id::in, goal_id::in, seen_usable_recursion::out) is det.
+
+par_conj_get_loop_control_par_conjs(Conjs, SelfPredProcId,
+        GoalId, SeenUsableRecursion) :-
+    (
+        Conjs = [],
+        unexpected($module, $pred, "Empty parallel conjunction")
+    ;
+        Conjs = [_ | _],
+        par_conj_get_loop_control_par_conjs_2(Conjs, SelfPredProcId,
+            SeenUsableRecursion0),
+        (
+            SeenUsableRecursion0 = have_not_seen_recursive_call,
+            SeenUsableRecursion = SeenUsableRecursion0
+        ;
+            SeenUsableRecursion0 = seen_one_recursive_call_on_every_branch,
+            SeenUsableRecursion = seen_usable_recursion_in_par_conj([GoalId])
+        ;
+            ( SeenUsableRecursion0 = seen_unusable_recursion
+            ; SeenUsableRecursion0 = seen_usable_recursion_in_par_conj(_)
+            ),
+            SeenUsableRecursion = seen_unusable_recursion
+        )
+    ).
+
+:- pred par_conj_get_loop_control_par_conjs_2(
+    list(hlds_goal)::in(non_empty_list), pred_proc_id::in,
+    seen_usable_recursion::out) is det.
+
+par_conj_get_loop_control_par_conjs_2([Conj | Conjs], SelfPredProcId,
+        SeenUsableRecursion) :-
+    goal_get_loop_control_par_conjs(Conj, SelfPredProcId,
+        SeenUsableRecursion0),
+    (
+        % This is the last conjunct.  Therefore, if it contains a recursive
+        % call it is a the recursion we're looking for.
+        Conjs = [],
+        SeenUsableRecursion = SeenUsableRecursion0
+    ;
+        Conjs = [_ | _],
+        % This is not the last conjunct.  Therefore any recursion it contains
+        % is unusable.
+        (
+            ( SeenUsableRecursion0 = seen_one_recursive_call_on_every_branch
+            ; SeenUsableRecursion0 = seen_unusable_recursion
+            ; SeenUsableRecursion0 = seen_usable_recursion_in_par_conj(_)
+            ),
+            SeenUsableRecursion = seen_unusable_recursion
+        ;
+            SeenUsableRecursion0 = have_not_seen_recursive_call,
+            % Analyze the rest of the conjunction.
+            par_conj_get_loop_control_par_conjs_2(Conjs, SelfPredProcId,
+                SeenUsableRecursion)
+        )
+    ).
+
+:- pred conj_get_loop_control_par_conjs(hlds_goals::in, pred_proc_id::in,
+    seen_usable_recursion::in(seen_usable_recursion_continue),
+    seen_usable_recursion::out) is det.
+
+conj_get_loop_control_par_conjs([], _, !SeenUsableRecursion).
+conj_get_loop_control_par_conjs([Conj | Conjs], SelfPredProcId,
+        !SeenUsableRecursion) :-
+    goal_get_loop_control_par_conjs(Conj, SelfPredProcId,
+        SeenUsableRecursionConj),
+    merge_loop_control_par_conjs_sequential(SeenUsableRecursionConj,
+        !SeenUsableRecursion),
+    (
+        !.SeenUsableRecursion = seen_unusable_recursion
+    ;
+        ( !.SeenUsableRecursion = seen_one_recursive_call_on_every_branch
+        ; !.SeenUsableRecursion = seen_usable_recursion_in_par_conj(_)
+        ; !.SeenUsableRecursion = have_not_seen_recursive_call
+        ),
+        conj_get_loop_control_par_conjs(Conjs, SelfPredProcId,
+            !SeenUsableRecursion)
+    ).
+
+:- pred case_get_loop_control_par_conjs(pred_proc_id::in, case::in,
+        seen_usable_recursion::out) is det.
+
+case_get_loop_control_par_conjs(SelfPredProcId, case(_, _, Goal),
+        SeenUsableRecursion) :-
+    goal_get_loop_control_par_conjs(Goal, SelfPredProcId,
+        SeenUsableRecursion).
+
+:- pred merge_loop_control_par_conjs_sequential(seen_usable_recursion::in,
+    seen_usable_recursion::in, seen_usable_recursion::out) is det.
+
+merge_loop_control_par_conjs_sequential(have_not_seen_recursive_call,
+        Seen, Seen).
+merge_loop_control_par_conjs_sequential(seen_unusable_recursion,
+        _, seen_unusable_recursion).
+merge_loop_control_par_conjs_sequential(
+        seen_one_recursive_call_on_every_branch, Seen0, Seen) :-
+    (
+        Seen0 = have_not_seen_recursive_call,
+        Seen = seen_one_recursive_call_on_every_branch
+    ;
+        ( Seen0 = seen_one_recursive_call_on_every_branch
+        ; Seen0 = seen_unusable_recursion
+        ; Seen0 = seen_usable_recursion_in_par_conj(_)
+        ),
+        Seen = seen_unusable_recursion
+    ).
+merge_loop_control_par_conjs_sequential(
+        seen_usable_recursion_in_par_conj(GoalIds), Seen0, Seen) :-
+    (
+        Seen0 = have_not_seen_recursive_call,
+        Seen = seen_usable_recursion_in_par_conj(GoalIds)
+    ;
+        ( Seen0 = seen_one_recursive_call_on_every_branch
+        ; Seen0 = seen_unusable_recursion
+        ; Seen0 = seen_usable_recursion_in_par_conj(_)
+        ),
+        Seen = seen_unusable_recursion
+    ).
+
+:- pred merge_loop_control_par_conjs_between_branches_list(
+    list(seen_usable_recursion)::in, seen_usable_recursion::out) is det.
+
+merge_loop_control_par_conjs_between_branches_list([],
+        have_not_seen_recursive_call).
+merge_loop_control_par_conjs_between_branches_list([Seen | Seens], Result) :-
+    foldl(merge_loop_control_par_conjs_between_branches, Seens, Seen, Result).
+
+:- pred merge_loop_control_par_conjs_between_branches(
+    seen_usable_recursion::in, seen_usable_recursion::in,
+    seen_usable_recursion::out) is det.
+
+merge_loop_control_par_conjs_between_branches(have_not_seen_recursive_call,
+        Seen0, Seen) :-
+    (
+        Seen0 = have_not_seen_recursive_call,
+        Seen = have_not_seen_recursive_call
+    ;
+        ( Seen0 = seen_one_recursive_call_on_every_branch
+        ; Seen0 = seen_unusable_recursion
+        ),
+        Seen = seen_unusable_recursion
+    ;
+        Seen0 = seen_usable_recursion_in_par_conj(_),
+        Seen = Seen0
+    ).
+merge_loop_control_par_conjs_between_branches(
+        seen_one_recursive_call_on_every_branch, Seen0, Seen) :-
+    (
+        Seen0 = seen_one_recursive_call_on_every_branch,
+        Seen = Seen0
+    ;
+        ( Seen0 = have_not_seen_recursive_call
+        ; Seen0 = seen_unusable_recursion
+        ; Seen0 = seen_usable_recursion_in_par_conj(_)
+        ),
+        Seen = seen_unusable_recursion
+    ).
+merge_loop_control_par_conjs_between_branches(seen_unusable_recursion, _,
+        seen_unusable_recursion).
+merge_loop_control_par_conjs_between_branches(
+        seen_usable_recursion_in_par_conj(GoalIdsA), Seen0, Seen) :-
+    (
+        Seen0 = have_not_seen_recursive_call,
+        Seen = seen_usable_recursion_in_par_conj(GoalIdsA)
+    ;
+        ( Seen0 = seen_one_recursive_call_on_every_branch
+        ; Seen0 = seen_unusable_recursion
+        ),
+        Seen = seen_unusable_recursion
+    ;
+        Seen0 = seen_usable_recursion_in_par_conj(GoalIdsB),
+        % We do the concatenation in this order so that it is not quadratic
+        % when called from merge_loop_control_par_conjs_between_branches_list.
+        GoalIds = GoalIdsA ++ GoalIdsB,
+        Seen = seen_usable_recursion_in_par_conj(GoalIds)
+    ).
+
+%----------------------------------------------------------------------------%
+
+:- pred create_inner_proc(list(goal_id)::in, pred_proc_id::in, proc_info::in,
+    containing_goal_map::in, pred_proc_id::out, sym_name::out,
+    module_info::in, module_info::out) is det.
+
+create_inner_proc(RecParConjIds, OldPredProcId, OldProcInfo,
+        ContainingGoalMap, PredProcId, PredSym, !ModuleInfo) :-
+    proc(OldPredId, OldProcId) = OldPredProcId,
+    module_info_pred_info(!.ModuleInfo, OldPredId, OldPredInfo),
+
+    % Gather data to build the new pred/proc.
+    module_info_get_name(!.ModuleInfo, ModuleName),
+    PredOrFunc = pred_info_is_pred_or_func(OldPredInfo),
+    make_pred_name(ModuleName, "LoopControl", yes(PredOrFunc),
+        pred_info_name(OldPredInfo), newpred_parallel_loop_control, PredSym0),
+    % The mode number is included because we want to avoid the creation of
+    % more than one predicate with the same name if more than one mode of
+    % a predicate is parallelised. Since the names of e.g. deep profiling
+    % proc_static structures are derived from the names of predicates,
+    % duplicate predicate names lead to duplicate global variable names
+    % and hence to link errors.
+    proc_id_to_int(OldProcId, OldProcInt),
+    add_sym_name_suffix(PredSym0, "_" ++ int_to_string(OldProcInt), PredSym),
+    pred_info_get_context(OldPredInfo, Context),
+    pred_info_get_origin(OldPredInfo, OldOrigin),
+    Origin = origin_transformed(transform_parallel_loop_control, OldOrigin,
+        OldPredId),
+    some [!Markers] (
+        init_markers(!:Markers),
+        add_marker(marker_is_impure, !Markers),
+        add_marker(marker_calls_are_fully_qualified, !Markers),
+        Markers = !.Markers
+    ),
+    pred_info_get_typevarset(OldPredInfo, TypeVarSet),
+    pred_info_get_exist_quant_tvars(OldPredInfo, ExistQVars),
+    pred_info_get_class_context(OldPredInfo, ClassConstraints),
+    pred_info_get_arg_types(OldPredInfo, ArgTypes0),
+
+    some [!PredInfo] (
+        % Construct the pred info structure.  We initially construct it with
+        % the old proc info which will be replaced below.
+        pred_info_create(ModuleName, PredSym, PredOrFunc, Context, Origin,
+            status_local, Markers, ArgTypes0, TypeVarSet, ExistQVars,
+            ClassConstraints, set.init, map.init, OldProcInfo, ProcId,
+            !:PredInfo),
+
+        % Add the new predicate to the module.
+        some [!PredTable] (
+            module_info_get_predicate_table(!.ModuleInfo, !:PredTable),
+            predicate_table_insert(!.PredInfo, PredId, !PredTable),
+            module_info_set_predicate_table(!.PredTable, !ModuleInfo)
+        ),
+        PredProcId = proc(PredId, ProcId),
+
+        % Now transform the predicate, this could not be done earlier because
+        % we needed to know the knew PredProcId to re-write the recursive calls
+        % in the body.
+        proc_info_get_argmodes(OldProcInfo, ArgModes0),
+        proc_info_get_headvars(OldProcInfo, HeadVars0),
+        proc_info_get_varset(OldProcInfo, VarSet0),
+        proc_info_get_vartypes(OldProcInfo, VarTypes0),
+        proc_info_get_goal(OldProcInfo, Body0),
+
+        varset.new_named_var("LC", LCVar, VarSet0, VarSet1),
+        map.det_insert(LCVar, loop_control_var_type, VarTypes0, VarTypes1),
+        should_preserve_tail_recursion(!.ModuleInfo, PreserveTailRecursion),
+        get_wait_free_slot_proc(!.ModuleInfo, WaitFreeSlotProc),
+
+        Info = loop_control_info(LCVar, OldPredProcId, PredProcId, PredSym,
+            PreserveTailRecursion, WaitFreeSlotProc, lc_wait_free_slot_name),
+        goal_loop_control_all_paths(Info, RecParConjIds,
+            ContainingGoalMap, Body0, Body, VarSet1, VarSet,
+            VarTypes1, VarTypes),
+
+        % Now create the new proc_info structure.
+        HeadVars = [LCVar | HeadVars0],
+        ArgTypes = [loop_control_var_type | ArgTypes0],
+        Ground = ground(shared, none),
+        In = (Ground -> Ground),
+        ArgModes = [In | ArgModes0],
+
+        proc_info_get_inst_varset(OldProcInfo, InstVarSet),
+        proc_info_get_rtti_varmaps(OldProcInfo, RttiVarMaps),
+        proc_info_get_inferred_determinism(OldProcInfo, Detism),
+        proc_info_create(Context, VarSet, VarTypes, HeadVars, InstVarSet,
+            ArgModes, detism_decl_none, Detism, Body, RttiVarMaps,
+            address_is_not_taken, map.init, ProcInfo),
+
+        % Update the other structures
+        pred_info_set_arg_types(TypeVarSet, ExistQVars, ArgTypes, !PredInfo),
+        pred_info_set_proc_info(ProcId, ProcInfo, !PredInfo),
+        module_info_set_pred_info(PredId, !.PredInfo, !ModuleInfo)
+    ).
+
+:- pred should_preserve_tail_recursion(module_info::in,
+    preserve_tail_recursion::out) is det.
+
+should_preserve_tail_recursion(ModuleInfo, PreserveTailRecursion) :-
+    module_info_get_globals(ModuleInfo, Globals),
+    globals.lookup_bool_option(Globals,
+        par_loop_control_preserve_tail_recursion, PreserveTailRecursionBool),
+    (
+        PreserveTailRecursionBool = yes,
+        PreserveTailRecursion = preserve_tail_recursion
+    ;
+        PreserveTailRecursionBool = no,
+        PreserveTailRecursion = do_not_preserve_tail_recursion
+    ).
+
+:- type loop_control_info
+    --->    loop_control_info(
+                lci_lc_var                      :: prog_var,
+                lci_rec_pred_proc_id            :: pred_proc_id,
+                lci_inner_pred_proc_id          :: pred_proc_id,
+                lci_inner_pred_name             :: sym_name,
+                lci_preserve_tail_recursion     :: preserve_tail_recursion,
+                lci_wait_free_slot_proc         :: pred_proc_id,
+                lci_wait_free_slot_proc_name    :: sym_name
+            ).
+
+:- type preserve_tail_recursion
+    --->    preserve_tail_recursion
+    ;       do_not_preserve_tail_recursion.
+
+:- pred goal_loop_control_all_paths(loop_control_info::in, list(goal_id)::in,
+    containing_goal_map::in, hlds_goal::in, hlds_goal::out,
+    prog_varset::in, prog_varset::out, vartypes::in, vartypes::out) is det.
+
+goal_loop_control_all_paths(Info, GoalIds, ContainingGoalMap, !Goal,
+        !VarSet, !VarTypes) :-
+    GoalPaths = map(goal_id_to_forward_path(ContainingGoalMap), GoalIds),
+    foldl3(goal_loop_control(Info), GoalPaths, !Goal, !VarSet,
+        !VarTypes).
+
+:- pred goal_loop_control(loop_control_info::in, forward_goal_path::in,
+    hlds_goal::in, hlds_goal::out, prog_varset::in, prog_varset::out,
+    vartypes::in, vartypes::out) is det.
+
+goal_loop_control(Info, GoalPath0, !Goal, !VarSet, !VarTypes) :-
+    !.Goal = hlds_goal(GoalExpr0, GoalInfo),
+    ( goal_path_remove_first(GoalPath0, GoalPath, Step) ->
+        format("Couldn't follow goal path step: \"%s\"", [s(string(Step))],
+            ErrorString),
+        (
+            Step = step_conj(N),
+            (
+                GoalExpr0 = conj(plain_conj, Conjs0),
+                list.index1(Conjs0, N, Conj0)
+            ->
+                goal_loop_control(Info, GoalPath, Conj0, Conj,
+                    !VarSet, !VarTypes),
+                det_replace_nth(Conjs0, N, Conj, Conjs),
+                GoalExpr = conj(plain_conj, Conjs)
+            ;
+                unexpected($module, $pred, ErrorString)
+            )
+        ;
+            Step = step_switch(N, _),
+            (
+                GoalExpr0 = switch(Var, CanFail, Cases0),
+                list.index1(Cases0, N, Case0)
+            ->
+                Goal0 = Case0 ^ case_goal,
+                goal_loop_control(Info, GoalPath, Goal0, Goal,
+                    !VarSet, !VarTypes),
+                Case = Case0 ^ case_goal := Goal,
+                det_replace_nth(Cases0, N, Case, Cases),
+                GoalExpr = switch(Var, CanFail, Cases)
+            ;
+                unexpected($module, $pred, ErrorString)
+            )
+        ;
+            Step = step_ite_then,
+            ( GoalExpr0 = if_then_else(Vars, Cond, Then0, Else) ->
+                goal_loop_control(Info, GoalPath, Then0, Then,
+                    !VarSet, !VarTypes),
+                GoalExpr = if_then_else(Vars, Cond, Then, Else)
+            ;
+                unexpected($module, $pred, ErrorString)
+            )
+        ;
+            Step = step_ite_else,
+            ( GoalExpr0 = if_then_else(Vars, Cond, Then, Else0) ->
+                goal_loop_control(Info, GoalPath, Else0, Else,
+                    !VarSet, !VarTypes),
+                GoalExpr = if_then_else(Vars, Cond, Then, Else)
+            ;
+                unexpected($module, $pred, ErrorString)
+            )
+        ;
+            Step = step_scope(_),
+            ( GoalExpr0 = scope(Reason, SubGoal0) ->
+                goal_loop_control(Info, GoalPath, SubGoal0, SubGoal,
+                    !VarSet, !VarTypes),
+                GoalExpr = scope(Reason, SubGoal)
+            ;
+                unexpected($module, $pred, ErrorString)
+            )
+        ;
+            ( Step = step_ite_cond
+            ; Step = step_disj(_)
+            ; Step = step_neg
+            ; Step = step_lambda
+            ; Step = step_try
+            ; Step = step_atomic_main
+            ; Step = step_atomic_orelse(_)
+            ),
+            unexpected($module, $pred,
+                format("Unexpected step in goal path \"%s\"",
+                [s(string(Step))]))
+        ),
+        !:Goal = hlds_goal(GoalExpr, GoalInfo),
+        fixup_goal_info(Info, !Goal)
+    ;
+        ( GoalExpr0 = conj(parallel_conj, Conjs) ->
+            par_conj_loop_control(Info, reverse(Conjs), GoalInfo, !:Goal,
+                !VarSet, !VarTypes)
+        ;
+            unexpected($module, $pred, "expected parallel conjunction")
+        )
+    ).
+
+:- pred par_conj_loop_control(loop_control_info::in, list(hlds_goal)::in,
+    hlds_goal_info::in, hlds_goal::out, prog_varset::in, prog_varset::out,
+    vartypes::in, vartypes::out) is det.
+
+par_conj_loop_control(_, [], _, _, !VarSet, !VarTypes) :-
+    unexpected($module, $pred, "empty parallel conjunction").
+par_conj_loop_control(Info, [LastConj0 | RevConjs], GoalInfo, Goal, !VarSet,
+        !VarTypes) :-
+    % Re-write the recursive call in the last conjunct.
+    goal_rewrite_recursive_call(Info, LastConj0, LastConj, _),
+    expand_plain_conj(LastConj, LastConjGoals),
+
+    % Process the remaining conjuncts, building up the nested set of ITEs from
+    % inside to outside.
+    par_conj_loop_control2(Info, RevConjs, LastConjGoals, Goals, !VarSet,
+        !VarTypes),
+    create_conj_from_list(Goals, plain_conj, Goal0),
+    Goal1 = Goal0 ^ hlds_goal_info := GoalInfo,
+    fixup_goal_info(Info, Goal1, Goal).
+
+    % Process each of the conjuncts in reverse order, building the new
+    % expression from them.
+    %
+:- pred par_conj_loop_control2(loop_control_info::in, list(hlds_goal)::in,
+    list(hlds_goal)::in, list(hlds_goal)::out,
+    prog_varset::in, prog_varset::out, vartypes::in, vartypes::out) is det.
+
+par_conj_loop_control2(_, [], LaterGoals, LaterGoals, !VarSet, !VarTypes).
+par_conj_loop_control2(Info, [Conj | RevConjs], LaterGoals, Goals, !VarSet,
+        !VarTypes) :-
+    % Create the "get free slot" call..
+    create_get_free_slot_goal(Info, LCSVar, GetFreeSlotGoal, !VarSet,
+        !VarTypes),
+
+    % Wrap Conj in the loop control scope.
+    LCVar = Info ^ lci_lc_var,
+    ConjGoalInfo = Conj ^ hlds_goal_info,
+    some [!NonLocals] (
+        !:NonLocals = goal_info_get_nonlocals(ConjGoalInfo),
+        insert(LCSVar, !NonLocals),
+        insert(LCVar, !NonLocals),
+        goal_info_set_nonlocals(!.NonLocals, ConjGoalInfo, ScopeGoalInfo)
+    ),
+    ScopeGoal = hlds_goal(scope(loop_control(LCVar, LCSVar), Conj),
+        ScopeGoalInfo),
+
+    % Process earlier conjuncts.
+    Goals0 = [GetFreeSlotGoal, ScopeGoal | LaterGoals],
+    par_conj_loop_control2(Info, RevConjs, Goals0, Goals, !VarSet, !VarTypes).
+
+    % Re-write any recursive calls in this goal.
+    %
+    % This predicate's argument order does not conform to the Mercury coding
+    % standards, this is deliberate as it makes it easier to call from
+    % list.map2.
+    %
+:- pred goal_rewrite_recursive_call(loop_control_info::in,
+    hlds_goal::in, hlds_goal::out, fixup_goal_info::out) is det.
+
+goal_rewrite_recursive_call(Info, !Goal, FixupGoalInfo) :-
+    !.Goal = hlds_goal(GoalExpr0, GoalInfo),
+    (
+        ( GoalExpr0 = unify(_, _, _, _, _)
+        ; GoalExpr0 = generic_call(_, _, _, _)
+        ; GoalExpr0 = call_foreign_proc(_, _, _, _, _, _, _)
+        ),
+        GoalExpr = GoalExpr0,
+        FixupGoalInfo = do_not_fixup_goal_info
+    ;
+        GoalExpr0 = plain_call(CallPredId0, CallProcId0, Args0, Builtin,
+            MaybeUnify, _Name0),
+        RecPredProcId = Info ^ lci_rec_pred_proc_id,
+        ( RecPredProcId = proc(CallPredId0, CallProcId0) ->
+            NewPredProcId = Info ^ lci_inner_pred_proc_id,
+            proc(CallPredId, CallProcId) = NewPredProcId,
+            LCVar = Info ^ lci_lc_var,
+            Args = [LCVar | Args0],
+            Name = Info ^ lci_inner_pred_name,
+            GoalExpr = plain_call(CallPredId, CallProcId, Args, Builtin,
+                MaybeUnify, Name),
+            FixupGoalInfo = fixup_goal_info
+        ;
+            GoalExpr = GoalExpr0,
+            FixupGoalInfo = do_not_fixup_goal_info
+        )
+    ;
+        GoalExpr0 = conj(ConjType, Conjs0),
+        map2(goal_rewrite_recursive_call(Info), Conjs0, Conjs,
+            FixupGoalInfoConjs),
+        goals_fixup_goal_info(FixupGoalInfoConjs, FixupGoalInfo),
+        GoalExpr = conj(ConjType, Conjs)
+    ;
+        GoalExpr0 = disj(Disjs0),
+        map2(goal_rewrite_recursive_call(Info), Disjs0, Disjs,
+            FixupGoalInfoDisjs),
+        goals_fixup_goal_info(FixupGoalInfoDisjs, FixupGoalInfo),
+        GoalExpr = disj(Disjs)
+    ;
+        GoalExpr0 = switch(Var, CanFail, Cases0),
+        map2(case_rewrite_recursive_call(Info), Cases0, Cases,
+            FixupGoalInfoCases),
+        goals_fixup_goal_info(FixupGoalInfoCases, FixupGoalInfo),
+        GoalExpr = switch(Var, CanFail, Cases)
+    ;
+        GoalExpr0 = negation(SubGoal0),
+        goal_rewrite_recursive_call(Info, SubGoal0, SubGoal,
+            FixupGoalInfo),
+        GoalExpr = negation(SubGoal)
+    ;
+        GoalExpr0 = scope(Reason, SubGoal0),
+        goal_rewrite_recursive_call(Info, SubGoal0, SubGoal,
+            FixupGoalInfo),
+        GoalExpr = scope(Reason, SubGoal)
+    ;
+        GoalExpr0 = if_then_else(Vars, Cond0, Then0, Else0),
+        goal_rewrite_recursive_call(Info, Cond0, Cond, FixupGoalInfoCond),
+        goal_rewrite_recursive_call(Info, Then0, Then, FixupGoalInfoThen),
+        goal_rewrite_recursive_call(Info, Else0, Else, FixupGoalInfoElse),
+        goals_fixup_goal_info([FixupGoalInfoCond, FixupGoalInfoThen,
+                FixupGoalInfoElse], FixupGoalInfo),
+        GoalExpr = if_then_else(Vars, Cond, Then, Else)
+    ;
+        GoalExpr0 = shorthand(_),
+        unexpected($module, $pred, "shorthand")
+    ),
+    !:Goal = hlds_goal(GoalExpr, GoalInfo),
+    (
+        FixupGoalInfo = fixup_goal_info,
+        fixup_goal_info(Info, !Goal),
+        ( GoalExpr = plain_call(_, _, _, _, _, _) ->
+            goal_add_feature(feature_do_not_tailcall, !Goal)
+        ;
+            true
+        )
+    ;
+        FixupGoalInfo = do_not_fixup_goal_info
+    ).
+
+:- pred case_rewrite_recursive_call(loop_control_info::in,
+    case::in, case::out, fixup_goal_info::out) is det.
+
+case_rewrite_recursive_call(Info, !Case, FixupGoalInfo) :-
+    some [!Goal] (
+        !:Goal = !.Case ^ case_goal,
+        goal_rewrite_recursive_call(Info, !Goal, FixupGoalInfo),
+        !Case ^ case_goal := !.Goal
+    ).
+
+:- pred goals_fixup_goal_info(list(fixup_goal_info)::in, fixup_goal_info::out)
+    is det.
+
+goals_fixup_goal_info(List, Fixup) :-
+    ( list.contains(List, fixup_goal_info) ->
+        Fixup = fixup_goal_info
+    ;
+        Fixup = do_not_fixup_goal_info
+    ).
+
+%----------------------------------------------------------------------------%
+
+:- pred create_get_free_slot_goal(loop_control_info::in, prog_var::out,
+    hlds_goal::out, prog_varset::in, prog_varset::out,
+    vartypes::in, vartypes::out) is det.
+
+create_get_free_slot_goal(Info, LCSVar, Goal, !VarSet,
+        !VarTypes) :-
+    varset.new_named_var("LCS", LCSVar, !VarSet),
+    map.det_insert(LCSVar, loop_control_slot_var_type, !VarTypes),
+    LCVar = Info ^ lci_lc_var,
+    proc(PredId, ProcId) = Info ^ lci_wait_free_slot_proc,
+    SymName = Info ^ lci_wait_free_slot_proc_name,
+
+    GoalExpr = plain_call(PredId, ProcId, [LCVar, LCSVar], not_builtin, no,
+        SymName),
+    NonLocals = list_to_set([LCVar, LCSVar]),
+    InstmapDelta = instmap_delta_bind_var(LCSVar),
+    GoalInfo = impure_init_goal_info(NonLocals, InstmapDelta, detism_det),
+    Goal = hlds_goal(GoalExpr, GoalInfo).
+
+%----------------------------------------------------------------------------%
+
+:- type fixup_goal_info
+    --->    fixup_goal_info
+    ;       do_not_fixup_goal_info.
+
+    % Fixup goalinfo after performing the loop control transformation.
+    %
+:- pred fixup_goal_info(loop_control_info::in, hlds_goal::in, hlds_goal::out)
+    is det.
+
+fixup_goal_info(Info, hlds_goal(GoalExpr, !.GoalInfo),
+        hlds_goal(GoalExpr, !:GoalInfo)) :-
+    LCVar = Info ^ lci_lc_var,
+    some [!NonLocals] (
+        !:NonLocals = goal_info_get_nonlocals(!.GoalInfo),
+        insert(LCVar, !NonLocals),
+        goal_info_set_nonlocals(!.NonLocals, !GoalInfo)
+    ),
+    goal_info_set_purity(purity_impure, !GoalInfo).
+
+%----------------------------------------------------------------------------%
+
+:- pred update_outer_proc(pred_proc_id::in, pred_proc_id::in, sym_name::in,
+    module_info::in, proc_info::in, proc_info::out) is det.
+
+update_outer_proc(PredProcId, InnerPredProcId, InnerPredName, ModuleInfo,
+        !ProcInfo) :-
+    proc(PredId, _) = PredProcId,
+    module_info_pred_info(ModuleInfo, PredId, PredInfo),
+    pred_info_get_arg_types(PredInfo, HeadVarTypes),
+    proc_info_get_headvars(!.ProcInfo, HeadVars0),
+    proc_info_get_inferred_determinism(!.ProcInfo, Detism),
+    proc_info_get_goal(!.ProcInfo, OrigGoal),
+    OrigInstmapDelta = goal_info_get_instmap_delta(OrigGoal ^ hlds_goal_info),
+    some [!VarSet, !VarTypes] (
+        % Re-build the variables in the procedure with smaller sets.
+        varset.init(!:VarSet),
+        map.init(!:VarTypes),
+        proc_info_get_varset(!.ProcInfo, OldVarSet),
+        foldl3_corresponding(add_old_var_to_sets(OldVarSet), HeadVars0,
+            HeadVarTypes, !VarSet, !VarTypes, map.init, Remap),
+        map(map.lookup(Remap), HeadVars0, HeadVars),
+        proc_info_set_headvars(HeadVars, !ProcInfo),
+
+        % Fix rtti varmaps.
+        proc_info_get_rtti_varmaps(!.ProcInfo, RttiVarmaps0),
+        apply_substitutions_to_rtti_varmaps(map.init, map.init, Remap,
+            RttiVarmaps0, RttiVarmaps),
+        proc_info_set_rtti_varmaps(RttiVarmaps, !ProcInfo),
+
+        % Create a variable for the number of worker contexts, we control this
+        % in the compiler so that it can be adjusted using profiler feedback
+        % (for auto-parallelisation), but for now we just set it using a runtime
+        % call so that it can be tuned.
+        varset.new_named_var("NumContexts", NumContextsVar, !VarSet),
+        map.det_insert(NumContextsVar, builtin_type(builtin_type_int),
+            !VarTypes),
+        get_lc_default_num_contexts_proc(ModuleInfo, LCDefaultNumContextsPredId,
+            LCDefaultNumContextsProcId),
+        goal_info_init(list_to_set([NumContextsVar]),
+            instmap_delta_bind_var(NumContextsVar),
+            detism_det, purity_pure, GetNumContextsGoalInfo),
+        GetNumContextsGoal = hlds_goal(plain_call(LCDefaultNumContextsPredId,
+                LCDefaultNumContextsProcId, [NumContextsVar],
+                not_builtin, no, lc_default_num_contexts_name),
+            GetNumContextsGoalInfo),
+
+        % Create the call to lc_create
+        varset.new_named_var("LC", LCVar, !VarSet),
+        map.det_insert(LCVar, loop_control_var_type, !VarTypes),
+        get_lc_create_proc(ModuleInfo, LCCreatePredId, LCCreateProcId),
+        goal_info_init(list_to_set([NumContextsVar, LCVar]),
+            instmap_delta_bind_var(LCVar), detism_det, purity_pure,
+            LCCreateGoalInfo),
+        LCCreateGoal = hlds_goal(plain_call(LCCreatePredId,
+                LCCreateProcId, [NumContextsVar, LCVar], not_builtin, no,
+                lc_create_name),
+            LCCreateGoalInfo),
+
+        % Create the inner call.
+        InnerCallArgs = [LCVar | HeadVars],
+        NonLocals = list_to_set(InnerCallArgs),
+        % The instmap of the call to the transformed body has the same instmap
+        % delta as the original body.
+        remap_instmap(Remap, OrigInstmapDelta, InstmapDelta),
+        goal_info_init(NonLocals, InstmapDelta, Detism, purity_impure,
+            InnerProcCallGoalInfo),
+        proc(InnerPredId, InnerProcId) = InnerPredProcId,
+        InnerProcCallGoal = hlds_goal(plain_call(InnerPredId, InnerProcId,
+            InnerCallArgs, not_builtin, no, InnerPredName),
+            InnerProcCallGoalInfo),
+
+        % Build a conjunction of these goals.
+        goal_info_init(list_to_set(HeadVars), InstmapDelta, Detism,
+            purity_impure, ConjGoalInfo),
+        ConjGoal = hlds_goal(conj(plain_conj,
+                [GetNumContextsGoal, LCCreateGoal, InnerProcCallGoal]),
+            ConjGoalInfo),
+
+        OrigPurity = goal_info_get_purity(OrigGoal ^ hlds_goal_info),
+        (
+            OrigPurity = purity_impure,
+            % The impurity introduced by this transformation does not need
+            % to be promised away.
+            Body = ConjGoal
+        ;
+            ( OrigPurity = purity_pure
+            ; OrigPurity = purity_semipure
+            ),
+            % Wrap the body in a scope to promise away the impurity.
+            goal_info_set_purity(purity_pure, ConjGoalInfo, ScopeGoalInfo),
+            Body = hlds_goal(scope(promise_purity(OrigPurity), ConjGoal),
+                ScopeGoalInfo)
+        ),
+
+        proc_info_set_goal(Body, !ProcInfo),
+        proc_info_set_varset(!.VarSet, !ProcInfo),
+        proc_info_set_vartypes(!.VarTypes, !ProcInfo)
+    ).
+
+:- pred add_old_var_to_sets(prog_varset::in, prog_var::in, mer_type::in,
+    prog_varset::in, prog_varset::out, vartypes::in, vartypes::out,
+    prog_var_renaming::in, prog_var_renaming::out) is det.
+
+add_old_var_to_sets(OldVarSet, OldVar, VarType, !VarSet, !VarTypes,
+        !Remap) :-
+    ( varset.search_name(OldVarSet, OldVar, Name) ->
+        varset.new_named_var(Name, Var, !VarSet)
+    ;
+        varset.new_var(Var, !VarSet)
+    ),
+    map.det_insert(Var, VarType, !VarTypes),
+    map.det_insert(OldVar, Var, !Remap).
+
+:- pred remap_instmap(map(prog_var, prog_var)::in,
+    instmap_delta::in, instmap_delta::out) is det.
+
+remap_instmap(Remap, OldInstmapDelta, !:InstmapDelta) :-
+    instmap_delta_to_assoc_list(OldInstmapDelta, VarInsts),
+    instmap_delta_init_reachable(!:InstmapDelta),
+    foldl((pred((OldVar - Inst)::in, IMD0::in, IMD::out) is det :-
+            map.lookup(Remap, OldVar, Var),
+            instmap_delta_set_var(Var, Inst, IMD0, IMD)
+        ), VarInsts, !InstmapDelta).
+
+%--------------------------------------------------------------------%
+
+:- func loop_control_var_type = mer_type.
+
+loop_control_var_type = defined_type(Sym, [], kind_star) :-
+    Sym = qualified(par_builtin_module_sym, "loop_control").
+
+:- func loop_control_slot_var_type = mer_type.
+
+loop_control_slot_var_type = defined_type(Sym, [], kind_star) :-
+    Sym = qualified(par_builtin_module_sym, "loop_control_slot").
+
+:- func lc_wait_free_slot_name = sym_name.
+
+lc_wait_free_slot_name =
+    qualified(par_builtin_module_sym, lc_wait_free_slot_name_unqualified).
+
+:- func lc_wait_free_slot_name_unqualified = string.
+
+lc_wait_free_slot_name_unqualified = "lc_wait_free_slot".
+
+:- pred get_wait_free_slot_proc(module_info::in, pred_proc_id::out) is det.
+
+get_wait_free_slot_proc(ModuleInfo, proc(PredId, ProcId)) :-
+    lookup_lc_pred_proc(ModuleInfo, lc_wait_free_slot_name_unqualified, 2, PredId,
+        ProcId).
+
+:- func lc_default_num_contexts_name_unqualified = string.
+
+lc_default_num_contexts_name_unqualified = "lc_default_num_contexts".
+
+:- func lc_default_num_contexts_name = sym_name.
+
+lc_default_num_contexts_name =
+    qualified(par_builtin_module_sym,
+        lc_default_num_contexts_name_unqualified).
+
+:- pred get_lc_default_num_contexts_proc(module_info::in, pred_id::out,
+    proc_id::out) is det.
+
+get_lc_default_num_contexts_proc(ModuleInfo, PredId, ProcId) :-
+    lookup_lc_pred_proc(ModuleInfo, lc_default_num_contexts_name_unqualified,
+        1, PredId, ProcId).
+
+:- func lc_create_name_unqualified = string.
+
+lc_create_name_unqualified = "lc_create".
+
+:- func lc_create_name = sym_name.
+
+lc_create_name =
+    qualified(par_builtin_module_sym, lc_create_name_unqualified).
+
+:- pred get_lc_create_proc(module_info::in, pred_id::out, proc_id::out) is det.
+
+get_lc_create_proc(ModuleInfo, PredId, ProcId) :-
+    lookup_lc_pred_proc(ModuleInfo, lc_create_name_unqualified, 2, PredId,
+        ProcId).
+
+:- pred lookup_lc_pred_proc(module_info::in, string::in, arity::in,
+    pred_id::out, proc_id::out) is det.
+
+lookup_lc_pred_proc(ModuleInfo, Sym, Arity, PredId, ProcId) :-
+    lookup_builtin_pred_proc_id(ModuleInfo, par_builtin_module_sym,
+        Sym, pf_predicate, Arity, only_mode, PredId, ProcId).
+
+:- func par_builtin_module_sym = sym_name.
+
+par_builtin_module_sym = unqualified("par_builtin").
+
+%----------------------------------------------------------------------------%
+:- end_module transform_hlds.par_loop_control.
+%----------------------------------------------------------------------------%
diff --git a/compiler/prog_util.m b/compiler/prog_util.m
index 9f6d19c..3dfdd99 100644
--- a/compiler/prog_util.m
+++ b/compiler/prog_util.m
@@ -75,6 +75,7 @@
     ;       newpred_type_subst(tvarset, type_subst)
     ;       newpred_unused_args(list(int))
     ;       newpred_parallel_args(list(int))
+    ;       newpred_parallel_loop_control
     ;       newpred_structure_reuse(int, list(int))     % Mode, no-clobber
                                                         % arguments.
     ;       newpred_distance_granularity(int).          % Distance
@@ -550,6 +551,9 @@ make_pred_name(ModuleName, Prefix, MaybePredOrFunc, PredName,
     ;
         NewPredId = newpred_distance_granularity(Distance),
         int_to_string(Distance, PredIdStr)
+    ;
+        NewPredId = newpred_parallel_loop_control,
+        PredIdStr = ""
     ),
 
     string.format("%s__%s__%s__%s",
diff --git a/compiler/saved_vars.m b/compiler/saved_vars.m
index 05abf89..e4cd697 100644
--- a/compiler/saved_vars.m
+++ b/compiler/saved_vars.m
@@ -251,6 +251,7 @@ ok_to_duplicate(feature_pretest_equality) = yes.
 ok_to_duplicate(feature_pretest_equality_condition) = yes.
 ok_to_duplicate(feature_lambda_undetermined_mode) = yes.
 ok_to_duplicate(feature_contains_stm_inner_outer) = yes.
+ok_to_duplicate(feature_do_not_tailcall) = no.
 
     % Divide a list of goals into an initial subsequence of goals
     % that construct constants, and all other goals.
diff --git a/compiler/transform_hlds.m b/compiler/transform_hlds.m
index f996237..6ca73ea 100644
--- a/compiler/transform_hlds.m
+++ b/compiler/transform_hlds.m
@@ -97,6 +97,7 @@
 :- include_module dep_par_conj.
 :- include_module parallel_to_plain_conj.
 :- include_module implicit_parallelism.
+:- include_module par_loop_control.
 
 :- include_module mmc_analysis.
 
diff --git a/doc/user_guide.texi b/doc/user_guide.texi
index 32c4788..5d9091d 100644
--- a/doc/user_guide.texi
+++ b/doc/user_guide.texi
@@ -912,7 +912,7 @@ The following variables can also appear in options files but are
 @item GCC_FLAGS
 @vindex GCC_FLAGS
 Options to pass to the C compiler, but only if the C compiler is GCC.
-If the C compiler is not GCC then this variable is ignored. 
+If the C compiler is not GCC then this variable is ignored.
 These options will be passed @emph{after} any options given by the
 @samp{CFLAGS} variable.
 
@@ -9520,6 +9520,27 @@ mdprof_create_feedback. The profiling feedback file can be specified using the
 Use the specified profiling feedback file
 which may currently only be processed for implicit parallelism.
 
+ at c @sp 1
+ at c @item --loop-control
+ at c @findex --loop-control
+ at c Enable the loop control transformation for parallel conjunctions.
+ at c This causes right-recursive parallel conjunctions to use fewer contexts while
+ at c maintaining parallelism.
+ at c This transformation is under development, when it is ready it will
+ at c probably be enabled by default.
+ at c
+ at c @sp 1
+ at c @item --no-loop-control-preserve-tail-recursion
+ at c @findex --no-loop-control-preserve-tail-recursion
+ at c Do not attempt to preserve tail recursion in the loop control transformation.
+ at c This option causes all code spawned off using loop control to access it's
+ at c parent stack frame through the parent stack pointer.
+ at c Rather than copying (parts) of the stack frame into the child's stack frame and
+ at c reading it from there.
+ at c This allows us to compare the cost of copying the stack frame with the cost of
+ at c non tail recursive code.
+ at c It is intended for developers only.
+
 @end table
 
 @node Target code compilation options
@@ -10083,6 +10104,14 @@ This only has an effect if the executable was built in a low-level C parallel
 grade.
 
 @c @sp 1
+ at c @item --num-contexts-per-lc-per-thread @var{num}
+ at c @findex --num-contexts-per-lc-per-thread (runtime option)
+ at c Tells the runtime system to use @var{num} contexts per POSIX thread to handle
+ at c each loop controlled loop.
+ at c This only has an effect if the executable was built in a low-level C parallel
+ at c grade.
+ at c
+ at c @sp 1
 @c @item --runtime-granularity-wsdeque-length-factor @var{factor}
 @c @findex --runtime-granularity-wsdeque-length-factor (runtime option)
 @c Configures the runtime granularity control method not to create sparks if a
diff --git a/library/par_builtin.m b/library/par_builtin.m
index 82e6d4a..c5113c8 100644
--- a/library/par_builtin.m
+++ b/library/par_builtin.m
@@ -7,7 +7,7 @@
 %---------------------------------------------------------------------------%
 %
 % File: par_builtin.m.
-% Main authors: wangp.
+% Main authors: wangp, pbone.
 % Stability: low.
 %
 % This file is automatically imported, as if via `use_module', into every
@@ -89,16 +89,27 @@
 
     % Allocate a free slot from the loop control structure and return it.
     % For documentation, see MR_lc_try_get_free_slot in mercury_par_builtin.h
+    % This call fails if there is no free slot available.
     %
 :- impure pred lc_free_slot(loop_control::in, loop_control_slot::out)
     is semidet.
 
+    % Allocate a free slot from the loop control structure and return it.
+    % This call blocks the context until a free slot is available.
+    %
+:- impure pred lc_wait_free_slot(loop_control::in, loop_control_slot::out)
+    is det.
+
     % Finish one iteration of the loop. This call does not return.
     % For documentation, see MR_lc_join_and_terminate in mercury_par_builtin.h.
     %
 :- impure pred lc_join_and_terminate(loop_control::in, loop_control_slot::in)
     is det.
 
+    % Get the default number of contexts to use for loop control.
+    %
+:- impure pred lc_default_num_contexts(int::out) is det.
+
 %-----------------------------------------------------------------------------%
 
 % The following predicates are intended to be used as conditions to decide
@@ -305,6 +316,7 @@ INIT mercury_sys_init_par_builtin_modules
 % predicates.
 
 :- external(lc_finish/1).
+:- external(lc_wait_free_slot/2).
 
 :- pragma foreign_code("C",
 "
@@ -325,9 +337,16 @@ mercury__par_builtin__lc_finish_1_p_0(MR_Box lc)
     MR_fatal_error(""lc_finish is unavailable with --highlevel-code"");
 }
 
+void MR_CALL
+mercury__par_builtin__lc_wait_free_slot(MR_Box lc, MR_Box lcs)
+{
+    MR_fatal_error(""lc_wait_free_slot is unavailable with --highlevel-code"");
+}
+
 #else /* ! MR_HIGHLEVEL_CODE */
 
 MR_def_extern_entry(par_builtin__lc_finish_1_0)
+MR_def_extern_entry(par_builtin__lc_wait_free_slot_2_0)
 
 MR_decl_label1(par_builtin__lc_finish_1_0, 1)
 
@@ -353,7 +372,7 @@ MR_define_entry(mercury__par_builtin__lc_finish_1_0)
         MR_LoopControl  *LC;
         
         LC = (MR_LoopControl *) MR_r1;
-        MR_lc_finish_part1(LC, par_builtin__lc_finish_1_0_i1);
+        MR_lc_finish_part1(LC, MR_LABEL_AP(par_builtin__lc_finish_1_0_i1));
     }
 #else
     MR_fatal_error(""lc_finish is unavailable in this grade"");
@@ -380,6 +399,40 @@ MR_def_label(par_builtin__lc_finish_1_0,1)
 #endif
 MR_END_MODULE
 
+MR_BEGIN_MODULE(par_builtin_module_lc_wait_free_slot)
+    MR_init_entry1(par_builtin__lc_wait_free_slot_2_0);
+    MR_INIT_PROC_LAYOUT_ADDR(mercury__par_builtin__lc_wait_free_slot_2_0);
+MR_BEGIN_CODE
+
+#ifdef MR_maybe_local_thread_engine_base
+    #undef MR_maybe_local_thread_engine_base
+    #define MR_maybe_local_thread_engine_base MR_local_thread_engine_base
+#endif
+
+MR_define_entry(mercury__par_builtin__lc_wait_free_slot_2_0)
+    MR_MAYBE_INIT_LOCAL_THREAD_ENGINE_BASE
+
+#if defined(MR_LL_PARALLEL_CONJ)
+    {
+        MR_LoopControl *lc;
+        MR_LoopControlSlot *lcs;
+
+        lc = (MR_LoopControl *) MR_r1;
+        MR_lc_wait_free_slot(lc, lcs, par_builtin__lc_wait_free_slot_2_0);
+        MR_r1 = (MR_Word)lcs;
+    }
+#else
+    MR_fatal_error(""lc_wait_free_slot is unavailable in this grade"");
+#endif
+
+    MR_proceed();
+
+#ifdef MR_maybe_local_thread_engine_base
+    #undef MR_maybe_local_thread_engine_base
+    #define MR_maybe_local_thread_engine_base MR_thread_engine_base
+#endif
+MR_END_MODULE
+
 #endif /* ! MR_HIGHLEVEL_CODE */
 
 /*
@@ -394,6 +447,7 @@ mercury_sys_init_lc_init(void)
 {
 #ifndef MR_HIGHLEVEL_CODE
     par_builtin_module_lc_finish();
+    par_builtin_module_lc_wait_free_slot();
 #endif
 }
 
@@ -437,6 +491,13 @@ mercury_sys_init_lc_write_out_proc_statics(FILE *deep_fp,
 #endif
 ").
 
+:- pragma foreign_proc("C",
+    lc_default_num_contexts(NumContexts::out),
+    [will_not_call_mercury, will_not_throw_exception, thread_safe],
+"
+    NumContexts = MR_num_contexts_per_loop_control;
+").
+
 %-----------------------------------------------------------------------------%
 
 :- pragma foreign_proc("C",
diff --git a/mdbcomp/mdbcomp.goal_path.m b/mdbcomp/mdbcomp.goal_path.m
index 286fb2d..2eed4ee 100644
--- a/mdbcomp/mdbcomp.goal_path.m
+++ b/mdbcomp/mdbcomp.goal_path.m
@@ -123,6 +123,18 @@
 :- pred goal_path_get_last(forward_goal_path::in, goal_path_step::out)
     is semidet.
 
+    % Remove the first item from the goal path, returning it and the new
+    % goal path.
+    %
+:- pred goal_path_remove_first(forward_goal_path::in, forward_goal_path::out,
+    goal_path_step::out) is semidet.
+
+    % Get the first item from the goal path. This fails if the goal path is
+    % empty.
+    %
+:- pred goal_path_get_first(forward_goal_path::in, goal_path_step::out)
+    is semidet.
+
     % Remove the last item from the goal path, returning it and the new
     % goal path.
     %
@@ -356,6 +368,12 @@ goal_path_last_loop(Head, fgp_nil, Head).
 goal_path_last_loop(_Head, fgp_cons(TailHead, TailTail), LastStep) :-
     goal_path_last_loop(TailHead, TailTail, LastStep).
 
+goal_path_remove_first(fgp_cons(FirstStep, OtherSteps), OtherSteps,
+    FirstStep).
+
+goal_path_get_first(GoalPath, FirstStep) :-
+    goal_path_remove_first(GoalPath, _, FirstStep).
+
 rev_goal_path_remove_last(rgp_cons(GoalPath, LastStep), GoalPath, LastStep).
 
 rev_goal_path_get_last(rgp_cons(_, LastStep), LastStep).
diff --git a/mdbcomp/program_representation.m b/mdbcomp/program_representation.m
index b4132aa..ee7e98c 100644
--- a/mdbcomp/program_representation.m
+++ b/mdbcomp/program_representation.m
@@ -1579,6 +1579,7 @@ pred_is_external("builtin", "compare_representation", 4).
 pred_is_external("backjump", "builtin_choice_id", 1).
 pred_is_external("backjump", "builtin_backjump", 1).
 pred_is_external("par_builtin", "lc_finish", 1).
+pred_is_external("par_builtin", "lc_wait_free_slot", 2).
 
 %-----------------------------------------------------------------------------%
 
diff --git a/runtime/mercury_par_builtin.c b/runtime/mercury_par_builtin.c
index dbc0f09..7a49dd1 100644
--- a/runtime/mercury_par_builtin.c
+++ b/runtime/mercury_par_builtin.c
@@ -48,6 +48,7 @@ MR_lc_create(unsigned num_workers)
 
     lc = MR_GC_malloc(sizeof(MR_LoopControl) +
         (num_workers-1) * sizeof(MR_LoopControlSlot));
+    lc->MR_lc_num_slots = num_workers;
     for (i = 0; i < num_workers; i++) {
         /*
         ** We allocate contexts as necessary, so that we never allocate a
@@ -57,14 +58,17 @@ MR_lc_create(unsigned num_workers)
         lc->MR_lc_slots[i].MR_lcs_context = NULL;
         lc->MR_lc_slots[i].MR_lcs_is_free = MR_TRUE;
     }
-    lc->MR_lc_num_slots = num_workers;
     lc->MR_lc_outstanding_workers = 0;
-    lc->MR_lc_waiting_context = NULL;
-    pthread_mutex_init(&(lc->MR_lc_lock), MR_MUTEX_ATTR);
+    lc->MR_lc_master_context_lock = MR_US_LOCK_INITIAL_VALUE;
+    lc->MR_lc_master_context = NULL;
+    lc->MR_lc_finished = MR_FALSE;
 
     return lc;
 }
 
+/*
+** Deprecated, this was part of our old loop control design.
+*/
 MR_LoopControlSlot *
 MR_lc_try_get_free_slot(MR_LoopControl* lc)
 {
@@ -113,40 +117,31 @@ MR_lc_join(MR_LoopControl* lc, MR_LoopControlSlot* lcs)
     MR_Context  *wakeup_context;
 
     lcs->MR_lcs_is_free = MR_TRUE;
+    /* Ensure the slot is free before we perform the decrement. */
+    MR_CPU_SFENCE;
     last_worker =
         MR_atomic_dec_and_is_zero_int(&(lc->MR_lc_outstanding_workers));
 
     /*
-    ** This barrier ensures we update MR_lc_outstanding_contexts before
-    ** we read MR_lc_finished. It works together with another barrier
-    ** in MR_lc_finish(). Together these barriers prevent a race whereby
-    ** the original thread is not resumed because MR_lc_finished looked false
-    ** in the condition below but last_worker was true, and the original
-    ** thread is about to go to sleep.
-    **
-    ** We go through these checks to avoid taking the lock in the then branch
-    ** below in cases when MR_lc_outstanding_workers is zero but the original
-    ** thread has not called MR_lc_finish() yet.
+    ** If the master thread is suspended wake it up, provided that
+    ** either: The loop has finished and this is the last worker to exit.
+    **         The loop has not finished (so the master can create more work).
     */
-    MR_CPU_MFENCE;
-    if (last_worker && lc->MR_lc_finished) {
-        /*
-        ** Wake up the first thread if it is sleeping.
-        ** XXX: a spinlock would do here, or maybe a CAS;
-        ** we never hold the lock for long.
-        */
-        MR_LOCK(&(lc->MR_lc_lock), "MC_lc_join_and_terminate");
-        wakeup_context = lc->MR_lc_waiting_context;
+    if (lc->MR_lc_master_context &&
+        ((lc->MR_lc_finished && last_worker) ||
+         (!lc->MR_lc_finished))) {
         /*
-        ** We don't need to clear the context field at this point: only one
-        ** worker can ever be the last worker, and therefore there is no danger
-        ** in adding this context to the run queue twice.
+        ** Now take a lock and re-read the master context field.
         */
-        MR_UNLOCK(&(lc->MR_lc_lock), "MR_lc_join_and_terminate");
+        MR_US_SPIN_LOCK(&(lc->MR_lc_master_context_lock));
+        wakeup_context = lc->MR_lc_master_context;
+        lc->MR_lc_master_context = NULL;
+        MR_US_UNLOCK(&(lc->MR_lc_master_context_lock));
         if (wakeup_context != NULL) {
             /*
             ** XXX: it is faster to switch to this context ourselves
             ** since we are going to unload our own context.
+            ** Or we should switch to another worker context if there is one.
             */
             MR_schedule_context(wakeup_context);
         }
diff --git a/runtime/mercury_par_builtin.h b/runtime/mercury_par_builtin.h
index 24073f0..a318016 100644
--- a/runtime/mercury_par_builtin.h
+++ b/runtime/mercury_par_builtin.h
@@ -303,11 +303,14 @@ struct MR_LoopControlSlot_Struct
 struct MR_LoopControl_Struct
 {
     unsigned                                MR_lc_num_slots;
+    MR_LoopControlSlot                      MR_lc_slots[1];
+    /* Outstanding workers is manipulated with atomic instructions */
     MR_THREADSAFE_VOLATILE MR_Integer       MR_lc_outstanding_workers;
-    MR_Context                              *MR_lc_waiting_context;
+    /* This lock protects only the next field */
+    MR_THREADSAFE_VOLATILE MR_Us_Lock       MR_lc_master_context_lock;
+    MR_Context                              *MR_lc_master_context;
+    /* Unused atm */
     MR_THREADSAFE_VOLATILE MR_bool          MR_lc_finished;
-    MercuryLock                             MR_lc_lock;
-    MR_LoopControlSlot                      MR_lc_slots[1];
 };
 
 #else
@@ -355,18 +358,20 @@ extern MR_LoopControl   *MR_lc_create(unsigned num_workers);
             ** This must be implemented as a macro, since we cannot move    \
             ** the C stack pointer without extra work.                      \
             */                                                              \
-            MR_ENGINE(MR_eng_this_context)->MR_ctxt_resume =                \
-                MR_LABEL(MR_add_prefix(part2_label));                       \
-            MR_LOCK(&((lc)->MR_lc_lock), "MR_lc_finish_part1");             \
-            if ((lc)->MR_lc_outstanding_workers == 0) {                     \
-                MR_UNLOCK(&((lc)->MR_lc_lock), "MR_lc_finish_part1");       \
-                MR_GOTO_LOCAL(MR_add_prefix(part2_label));                  \
+            MR_US_SPIN_LOCK(&((lc)->MR_lc_master_context_lock));            \
+            if ((lc)->MR_lc_outstanding_workers != 0) {                     \
+                MR_save_context(MR_ENGINE(MR_eng_this_context));            \
+                MR_ENGINE(MR_eng_this_context)->MR_ctxt_resume_owner_engine = \
+                    MR_ENGINE(MR_eng_id);                                   \
+                MR_ENGINE(MR_eng_this_context)->MR_ctxt_resume =            \
+                    (part2_label);                                          \
+                (lc)->MR_lc_master_context = MR_ENGINE(MR_eng_this_context);\
+                MR_US_UNLOCK(&((lc)->MR_lc_master_context_lock));           \
+                MR_ENGINE(MR_eng_this_context) = NULL;                      \
+                MR_idle(); /* Release the engine to the idle loop */        \
             }                                                               \
-            MR_save_context(MR_ENGINE(MR_eng_this_context));                \
-            (lc)->MR_lc_waiting_context = MR_ENGINE(MR_eng_this_context);   \
-            MR_UNLOCK(&((lc)->MR_lc_lock), "MR_lc_finish_part1");           \
-            MR_ENGINE(MR_eng_this_context) = NULL;                          \
-            MR_idle(); /* Release the engine to the idle loop */            \
+            MR_US_UNLOCK(&((lc)->MR_lc_master_context_lock));               \
+            /* Fall through to part2 */                                     \
         }                                                                   \
     } while (0);
 
@@ -382,15 +387,70 @@ extern MR_LoopControl   *MR_lc_create(unsigned num_workers);
                 MR_destroy_context((lc)->MR_lc_slots[i].MR_lcs_context);    \
             }                                                               \
         }                                                                   \
-        pthread_mutex_destroy(&((lc)->MR_lc_lock));                         \
     } while (0);
 
 /*
 ** Get a free slot in the loop control if there is one.
+**
+** Deprecated: this was part of our old loop control design.
 */
 extern MR_LoopControlSlot* MR_lc_try_get_free_slot(MR_LoopControl* lc);
 
 /*
+** Get a free slot in the loop control, or block until one is available.
+*/
+#define MR_lc_wait_free_slot(lc, lcs, retry_label)                          \
+    do {                                                                    \
+        unsigned    i;                                                      \
+                                                                            \
+        if ((lc)->MR_lc_outstanding_workers == (lc)->MR_lc_num_slots) {     \
+            MR_US_SPIN_LOCK(&((lc)->MR_lc_master_context_lock));            \
+            /*                                                              \
+            ** Re-check outstanding workers while holding the lock.  This   \
+            ** ensures that we only commit to sleeping while holding the    \
+            ** lock, But if there were a worker available we wouldn't need   \
+            ** to take the lock at all.                                     \
+            */                                                              \
+            if ((lc)->MR_lc_outstanding_workers == (lc)->MR_lc_num_slots) { \
+                MR_Context *ctxt;                                           \
+                                                                            \
+                /*                                                          \
+                ** Block this context and have it retry once it's           \
+                ** unblocked                                                \
+                */                                                          \
+                ctxt = MR_ENGINE(MR_eng_this_context);                      \
+                (lc)->MR_lc_master_context = ctxt;                          \
+                MR_save_context(ctxt);                                      \
+                ctxt->MR_ctxt_resume = MR_add_prefix(retry_label);          \
+                ctxt->MR_ctxt_resume_owner_engine = MR_ENGINE(MR_eng_id);   \
+                MR_US_UNLOCK(&(lc->MR_lc_master_context_lock));             \
+                MR_ENGINE(MR_eng_this_context) = NULL;                      \
+                MR_idle();                                                  \
+            }                                                               \
+            MR_US_UNLOCK(&((lc)->MR_lc_master_context_lock));               \
+        }                                                                   \
+                                                                            \
+        /*                                                                  \
+        ** Optimize this by using a hint to start the search at.            \
+        */                                                                  \
+        for (i = 0; i<(lc)->MR_lc_num_slots; i++) {                         \
+            if ((lc)->MR_lc_slots[i].MR_lcs_is_free) {                      \
+                (lc)->MR_lc_slots[i].MR_lcs_is_free = MR_FALSE;             \
+                MR_atomic_inc_int(&((lc)->MR_lc_outstanding_workers));      \
+                (lcs) = &((lc)->MR_lc_slots[i]);                            \
+                break;                                                      \
+            }                                                               \
+        }                                                                   \
+                                                                            \
+        /*                                                                  \
+        ** Since only one context can ever run MR_lc_wait_free_slot then we \
+        ** can never fail to find a since outstanding workers can never be  \
+        ** incremented by another engine.                                   \
+        */                                                                  \
+        MR_fatal_error("No free slot found in loop control");               \
+    } while (0);
+
+/*
 ** Try to spawn off this code using the free slot.
 */
 #define MR_lc_spawn_off(lcs, label) \
diff --git a/runtime/mercury_wrapper.c b/runtime/mercury_wrapper.c
index 8b7b877..88fb1a7 100644
--- a/runtime/mercury_wrapper.c
+++ b/runtime/mercury_wrapper.c
@@ -216,6 +216,12 @@ MR_Unsigned MR_max_contexts_per_thread = 2;
 #endif
 MR_Unsigned MR_max_outstanding_contexts;
 
+/*
+** The number of contexts per loop control per thread.
+*/
+MR_Unsigned MR_num_contexts_per_loop_control_per_thread = 4;
+MR_Unsigned MR_num_contexts_per_loop_control;
+
 /* file names for mdb's debugger I/O streams */
 const char  *MR_mdb_in_filename = NULL;
 const char  *MR_mdb_out_filename = NULL;
@@ -629,6 +635,8 @@ mercury_runtime_init(int argc, char **argv)
     MR_init_context_stuff();
     MR_init_thread_stuff();
     MR_max_outstanding_contexts = MR_max_contexts_per_thread * MR_num_threads;
+    MR_num_contexts_per_loop_control =
+        MR_num_contexts_per_loop_control_per_thread * MR_num_threads;
 #ifdef MR_LL_PARALLEL_CONJ
     MR_granularity_wsdeque_length = MR_granularity_wsdeque_length_factor * MR_num_threads;
 #endif
@@ -1298,6 +1306,7 @@ enum MR_long_option {
     MR_GEN_NONDETSTACK_REDZONE_SIZE,
     MR_GEN_NONDETSTACK_REDZONE_SIZE_KWORDS,
     MR_MAX_CONTEXTS_PER_THREAD,
+    MR_NUM_CONTEXTS_PER_LC_PER_THREAD,
     MR_RUNTIME_GRANULAITY_WSDEQUE_LENGTH_FACTOR,
     MR_WORKSTEAL_MAX_ATTEMPTS,
     MR_WORKSTEAL_SLEEP_MSECS,
@@ -1400,6 +1409,7 @@ struct MR_option MR_long_opts[] = {
     { "gen-nondetstack-zone-size-kwords",
         1, 0, MR_GEN_NONDETSTACK_REDZONE_SIZE_KWORDS },
     { "max-contexts-per-thread",        1, 0, MR_MAX_CONTEXTS_PER_THREAD },
+    { "num-contexts-per-lc-per-thread", 1, 0, MR_NUM_CONTEXTS_PER_LC_PER_THREAD },
     { "runtime-granularity-wsdeque-length-factor", 1, 0,
         MR_RUNTIME_GRANULAITY_WSDEQUE_LENGTH_FACTOR },
     { "thread-pinning",                 0, 0, MR_THREAD_PINNING },
@@ -1824,6 +1834,14 @@ MR_process_options(int argc, char **argv)
                 MR_max_contexts_per_thread = size;
                 break;
 
+            case MR_NUM_CONTEXTS_PER_LC_PER_THREAD:
+                if (sscanf(MR_optarg, "%lu", &size) != 1) {
+                    MR_usage();
+                }
+
+                MR_num_contexts_per_loop_control_per_thread = size;
+                break;
+
             case MR_RUNTIME_GRANULAITY_WSDEQUE_LENGTH_FACTOR:
 #if defined(MR_LL_PARALLEL_CONJ)
                 if (sscanf(MR_optarg, "%"MR_INTEGER_LENGTH_MODIFIER"u",
diff --git a/runtime/mercury_wrapper.h b/runtime/mercury_wrapper.h
index 59b5af8..25593a3 100644
--- a/runtime/mercury_wrapper.h
+++ b/runtime/mercury_wrapper.h
@@ -259,6 +259,11 @@ extern	MR_Unsigned	MR_contexts_per_thread;
 */
 extern	MR_Unsigned	MR_max_outstanding_contexts;
 
+/*
+** number of contexts to create per loop controlled loop.
+*/
+extern  MR_Unsigned MR_num_contexts_per_loop_control;
+
 extern  MR_Unsigned MR_num_threads;
 
 #if defined(MR_THREAD_SAFE) && defined(MR_LL_PARALLEL_CONJ)
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 490 bytes
Desc: Digital signature
URL: <http://lists.mercurylang.org/archives/reviews/attachments/20110927/9d6b73c3/attachment.sig>


More information about the reviews mailing list