diff --git a/mpisppy/cylinders/fwph_spoke.py b/mpisppy/cylinders/fwph_spoke.py index e1ac34585..d2eee80f0 100644 --- a/mpisppy/cylinders/fwph_spoke.py +++ b/mpisppy/cylinders/fwph_spoke.py @@ -27,6 +27,9 @@ def sync(self): # Tell the hub about the most recent bound self.send_bound(self.opt._local_bound) + # Update the nonant bounds, if possible + self.receive_nonant_bounds() + def finalize(self): # The FWPH spoke can call "finalize" before it # even starts doing anything, so its possible diff --git a/mpisppy/cylinders/hub.py b/mpisppy/cylinders/hub.py index 333781882..384069e9f 100644 --- a/mpisppy/cylinders/hub.py +++ b/mpisppy/cylinders/hub.py @@ -12,22 +12,21 @@ import mpisppy.log from mpisppy.cylinders.spcommunicator import RecvArray, SPCommunicator -from math import inf from mpisppy import global_toc from mpisppy.cylinders.spwindow import Field # Could also pass, e.g., sys.stdout instead of a filename -mpisppy.log.setup_logger("mpisppy.cylinders.Hub", +mpisppy.log.setup_logger(__name__, "hub.log", level=logging.CRITICAL) -logger = logging.getLogger("mpisppy.cylinders.Hub") +logger = logging.getLogger(__name__) class Hub(SPCommunicator): send_fields = (*SPCommunicator.send_fields, Field.SHUTDOWN, Field.BEST_OBJECTIVE_BOUNDS,) - receive_fields = (*SPCommunicator.receive_fields, Field.OBJECTIVE_INNER_BOUND, Field.OBJECTIVE_OUTER_BOUND, ) + receive_fields = (*SPCommunicator.receive_fields,) _hub_algo_best_bound_provider = False @@ -37,16 +36,10 @@ def __init__(self, spbase_object, fullcomm, strata_comm, cylinder_comm, communic logger.debug(f"Built the hub object on global rank {fullcomm.Get_rank()}") # for logging self.print_init = True - self.latest_ib_char = None - self.latest_ob_char = None - self.last_ib_idx = None - self.last_ob_idx = None # for termination based on stalling out self.stalled_iter_cnt = 0 self.last_gap = float('inf') # abs_gap tracker - self.initialize_bound_values() - return @abc.abstractmethod @@ -175,72 +168,6 @@ def hub_finalize(self): global_toc("Statistics at termination", True) self.screen_trace() - def receive_innerbounds(self): - """ Get inner bounds from inner bound spokes - NOTE: Does not check if there _are_ innerbound spokes - (but should be harmless to call if there are none) - """ - logging.debug("Hub is trying to receive from InnerBounds") - for idx, cls, recv_buf in self.receive_field_spcomms[Field.OBJECTIVE_INNER_BOUND]: - is_new = self.get_receive_buffer(recv_buf, Field.OBJECTIVE_INNER_BOUND, idx) - if is_new: - bound = recv_buf[0] - logging.debug("!! new InnerBound to opt {}".format(bound)) - self.BestInnerBound = self.InnerBoundUpdate(bound, cls, idx) - logging.debug("ph back from InnerBounds") - - def receive_outerbounds(self): - """ Get outer bounds from outer bound spokes - NOTE: Does not check if there _are_ outerbound spokes - (but should be harmless to call if there are none) - """ - logging.debug("Hub is trying to receive from OuterBounds") - for idx, cls, recv_buf in self.receive_field_spcomms[Field.OBJECTIVE_OUTER_BOUND]: - is_new = self.get_receive_buffer(recv_buf, Field.OBJECTIVE_OUTER_BOUND, idx) - if is_new: - bound = recv_buf[0] - logging.debug("!! new OuterBound to opt {}".format(bound)) - self.BestOuterBound = self.OuterBoundUpdate(bound, cls, idx) - logging.debug("ph back from OuterBounds") - - def OuterBoundUpdate(self, new_bound, cls=None, idx=None, char='*'): - current_bound = self.BestOuterBound - if self._outer_bound_update(new_bound, current_bound): - if cls is None: - self.latest_ob_char = char - self.last_ob_idx = 0 - else: - self.latest_ob_char = cls.converger_spoke_char - self.last_ob_idx = idx - return new_bound - else: - return current_bound - - def InnerBoundUpdate(self, new_bound, cls=None, idx=None, char='*'): - current_bound = self.BestInnerBound - if self._inner_bound_update(new_bound, current_bound): - if cls is None: - self.latest_ib_char = char - self.last_ib_idx = 0 - else: - self.latest_ib_char = cls.converger_spoke_char - self.last_ib_idx = idx - return new_bound - else: - return current_bound - - def initialize_bound_values(self): - if self.opt.is_minimizing: - self.BestInnerBound = inf - self.BestOuterBound = -inf - self._inner_bound_update = lambda new, old : (new < old) - self._outer_bound_update = lambda new, old : (new > old) - else: - self.BestInnerBound = -inf - self.BestOuterBound = inf - self._inner_bound_update = lambda new, old : (new > old) - self._outer_bound_update = lambda new, old : (new < old) - def _populate_boundsout_cache(self, buf): """ Populate a given buffer with the current bounds """ @@ -292,6 +219,7 @@ def send_terminate(self): return def sync_bounds(self): + self.receive_nonant_bounds() self.receive_outerbounds() self.receive_innerbounds() self.send_boundsout() diff --git a/mpisppy/cylinders/lagrangian_bounder.py b/mpisppy/cylinders/lagrangian_bounder.py index d35c974a7..0eac149e5 100644 --- a/mpisppy/cylinders/lagrangian_bounder.py +++ b/mpisppy/cylinders/lagrangian_bounder.py @@ -20,6 +20,8 @@ def lagrangian_prep(self): self.opt._create_solvers() def lagrangian(self, need_solution=True, warmstart=False): + # update the nonant bounds, if possible, for a tighter relaxation + self.receive_nonant_bounds() verbose = self.opt.options['verbose'] # This is sort of a hack, but might help folks: if "ipopt" in self.opt.options["solver_name"]: @@ -59,12 +61,21 @@ class LagrangianOuterBound(_LagrangianMixin, mpisppy.cylinders.spoke.OuterBoundW converger_spoke_char = 'L' - def _set_weights_and_solve(self, need_solution=True): + def _set_weights_and_solve(self, need_solution): self.opt.W_from_flat_list(self.localWs) # Sets the weights return self.lagrangian(need_solution=need_solution) + def do_while_waiting_for_new_Ws(self, need_solution, warmstart=False): + if self.opt.options.get("subgradient_while_waiting", False): + # compute a subgradient step + self.opt.Compute_Xbar(self.verbose) + self.opt.Update_W(self.verbose) + bound = self.lagrangian(need_solution=need_solution, warmstart=warmstart) + if bound is not None: + self.send_bound(bound) + def main(self, need_solution=False): - verbose = self.opt.options['verbose'] + self.verbose = self.opt.options['verbose'] extensions = self.opt.extensions is not None self.lagrangian_prep() @@ -94,10 +105,5 @@ def main(self, need_solution=False): if extensions: self.opt.extobject.enditer_after_sync() self.dk_iter += 1 - elif self.opt.options.get("subgradient_while_waiting", False): - # compute a subgradient step - self.opt.Compute_Xbar(verbose) - self.opt.Update_W(verbose) - bound = self.lagrangian(need_solution=need_solution, warmstart=True) - if bound is not None: - self.send_bound(bound) + else: + self.do_while_waiting_for_new_Ws(need_solution=need_solution, warmstart=True) diff --git a/mpisppy/cylinders/reduced_costs_spoke.py b/mpisppy/cylinders/reduced_costs_spoke.py index b38a92d65..b496b30aa 100644 --- a/mpisppy/cylinders/reduced_costs_spoke.py +++ b/mpisppy/cylinders/reduced_costs_spoke.py @@ -6,16 +6,18 @@ # All rights reserved. Please see the files COPYRIGHT.md and LICENSE.md for # full copyright and license information. ############################################################################### +import math import pyomo.environ as pyo import numpy as np from mpisppy.cylinders.lagrangian_bounder import LagrangianOuterBound from mpisppy.cylinders.spwindow import Field from mpisppy.utils.sputils import is_persistent -from mpisppy import MPI +from mpisppy import MPI, global_toc class ReducedCostsSpoke(LagrangianOuterBound): - send_fields = (*LagrangianOuterBound.send_fields, Field.EXPECTED_REDUCED_COST, Field.SCENARIO_REDUCED_COST,) + send_fields = (*LagrangianOuterBound.send_fields, Field.EXPECTED_REDUCED_COST, Field.SCENARIO_REDUCED_COST, + Field.NONANT_LOWER_BOUNDS, Field.NONANT_UPPER_BOUNDS,) receive_fields = (*LagrangianOuterBound.receive_fields,) converger_spoke_char = 'R' @@ -24,6 +26,7 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.bound_tol = self.opt.options['rc_bound_tol'] self.consensus_threshold = np.sqrt(self.bound_tol) + self._best_inner_bound = math.inf if self.opt.is_minimizing else -math.inf def register_send_fields(self) -> None: @@ -57,8 +60,46 @@ def register_send_fields(self) -> None: scenario_buffer_len += len(s._mpisppy_data.nonant_indices) self._scenario_rc_buffer = np.zeros(scenario_buffer_len) + self.initialize_bound_fields() + self.create_integer_variable_where() + return + def initialize_bound_fields(self): + self._nonant_lower_bounds = self.send_buffers[Field.NONANT_LOWER_BOUNDS].value_array() + self._nonant_upper_bounds = self.send_buffers[Field.NONANT_UPPER_BOUNDS].value_array() + + self._nonant_lower_bounds[:] = -np.inf + self._nonant_upper_bounds[:] = np.inf + + for s in self.opt.local_scenarios.values(): + scenario_lower_bounds = np.fromiter( + _lb_generator(s._mpisppy_data.nonant_indices.values()), + dtype=float, + count=len(s._mpisppy_data.nonant_indices), + ) + np.maximum(self._nonant_lower_bounds, scenario_lower_bounds, out=self._nonant_lower_bounds) + + scenario_upper_bounds = np.fromiter( + _ub_generator(s._mpisppy_data.nonant_indices.values()), + dtype=float, + count=len(s._mpisppy_data.nonant_indices), + ) + np.minimum(self._nonant_upper_bounds, scenario_upper_bounds, out=self._nonant_upper_bounds) + + self._best_ub_update_function_slope = np.full(len(self._nonant_upper_bounds), 0.0) + self._best_ub_update_function_intercept = np.full(len(self._nonant_upper_bounds), np.inf) + + self._best_lb_update_function_slope = np.full(len(self._nonant_lower_bounds), 0.0) + self._best_lb_update_function_intercept = np.full(len(self._nonant_lower_bounds), -np.inf) + + def create_integer_variable_where(self): + self._integer_variable_where = np.full(len(self._nonant_lower_bounds), False) + for s in self.opt.local_scenarios.values(): + for idx, xvar in enumerate(s._mpisppy_data.nonant_indices.values()): + if xvar.is_integer(): + self._integer_variable_where[idx] = True + @property def rc_global(self): return self.send_buffers[Field.EXPECTED_REDUCED_COST].value_array() @@ -84,17 +125,11 @@ def lagrangian_prep(self): same as base class, but relax the integer variables and attach the reduced cost suffix """ - # Split up PH_Prep? Prox option is important for APH. - # Seems like we shouldn't need the Lagrangian stuff, so attach_prox=False - # Scenarios are created here - self.opt.PH_Prep(attach_prox=False) - self.opt._reenable_W() - relax_integer_vars = pyo.TransformationFactory("core.relax_integer_vars") for s in self.opt.local_subproblems.values(): relax_integer_vars.apply_to(s) s.rc = pyo.Suffix(direction=pyo.Suffix.IMPORT) - self.opt._create_solvers(presolve=False) + super().lagrangian_prep() def lagrangian(self, need_solution=True, warmstart=False): if not need_solution: @@ -102,9 +137,15 @@ def lagrangian(self, need_solution=True, warmstart=False): bound = super().lagrangian(need_solution=need_solution, warmstart=False) if bound is not None: self.extract_and_store_reduced_costs() + self.update_bounding_functions(bound) + self.extract_and_store_updated_nonant_bounds(new_dual=True) return bound def extract_and_store_reduced_costs(self): + # if it's time, don't bother + if self.got_kill_signal(): + return + self.opt.Compute_Xbar() # NaN will signal that the x values do not agree in # every scenario, we can't extract an expected reduced @@ -176,6 +217,157 @@ def extract_and_store_reduced_costs(self): Field.SCENARIO_REDUCED_COST, ) + def update_bounding_functions(self, lr_outer_bound): + """ This method attempts to update the best function we've found + so far to prove given bounds. Because it's difficult in general + to know if one linear function dominates another within a range + (and the answer maybe inconclusive), we'll settle for evaluting + the function at the current inner bound. If that point does not + exist we will go some distance from the current lr_outer_bound. + """ + # if it's time, don't bother + if self.got_kill_signal(): + return + + self.receive_innerbounds() + + if math.isinf(self.BestInnerBound): + if self.opt.is_minimizing: + # inner_bound > outer_bound + test_inner_bound = max(lr_outer_bound*1.01, lr_outer_bound+1) + else: + # inner_bound < outer_bound + test_inner_bound = min(lr_outer_bound*0.99, lr_outer_bound-1) + else: + test_inner_bound = self.BestInnerBound + nonzero_rc = np.where(self.rc_global==0, np.nan, self.rc_global) + # the slope is (IB - OB) / reduced_costs -- if the reduced costs are non-zero (e.g., the bound is active) + bound_tightening = np.divide(test_inner_bound - lr_outer_bound, nonzero_rc) + + # (IB - OB) / reduced_costs + # for minimization, (IB - OB) > 0 + # reduced_cost > 0 --> lower bound active + # reduced_cost < 0 --> upper bound active + # for maximization, (IB - OB) < 0 + # reduced_cost < 0 --> lower bound active + # reduced_cost > 0 --> upper bound active + # regardless, + # (IB - OB) / reduced_cost > 0 --> lower bound active --> upper bound can be tightened + # (IB - OB) / reduced_cost < 0 --> upper bound active --> lower bound can be tightened + tighten_upper = np.where(bound_tightening>0, bound_tightening, np.nan) + tighten_lower = np.where(bound_tightening<0, bound_tightening, np.nan) + + # UB <- LB + IB / reduced_costs - OB / reduced_costs + # m = 1 / reduced_costs; b = - OB / reduced_costs + # IB (inner bound) changes between iterations ... + current_upper_tightening = self._best_ub_update_function_slope * test_inner_bound + self._best_ub_update_function_intercept + # similar + current_lower_tightening = self._best_lb_update_function_slope * test_inner_bound + self._best_lb_update_function_intercept + + # We'll keep the best bound update function based on which tightened the best for the current value + self._best_ub_update_function_slope = np.where(tighten_upper < current_upper_tightening, 1.0 / nonzero_rc, self._best_ub_update_function_slope) + self._best_ub_update_function_intercept = np.where(tighten_upper < current_upper_tightening, tighten_upper - (test_inner_bound / nonzero_rc), self._best_ub_update_function_intercept) + + # We'll keep the best bound update function based on which tightened the best for the current value + self._best_lb_update_function_slope = np.where(tighten_lower > current_lower_tightening, 1.0 / nonzero_rc, self._best_lb_update_function_slope) + self._best_lb_update_function_intercept = np.where(tighten_lower > current_lower_tightening, tighten_lower - (test_inner_bound / nonzero_rc), self._best_lb_update_function_intercept) + + def extract_and_store_updated_nonant_bounds(self, new_dual=False): + # if it's time, don't bother + if self.got_kill_signal(): + return + + self.receive_innerbounds() + + if math.isinf(self.BestInnerBound): + # can do anything with no bound + return + + if self._inner_bound_update(self.BestInnerBound, self._best_inner_bound): + self._best_inner_bound = self.BestInnerBound + elif not new_dual: + # no better inner bound than last time; and no dual update either + return + + tighten_upper = self._best_ub_update_function_slope * self._best_inner_bound + self._best_ub_update_function_intercept + tighten_lower = self._best_lb_update_function_slope * self._best_inner_bound + self._best_lb_update_function_intercept + + # UB <- LB + (IB - OB) / reduced_cost --> tightening upper bound + tighten_upper += self._nonant_lower_bounds + # LB <- UB + (IB - OB) / reduced_cost --> tightening lower bound + tighten_lower += self._nonant_upper_bounds + + # max of existing lower and new lower, ignoring nan's + np.fmax(tighten_lower, self._nonant_lower_bounds, out=self._nonant_lower_bounds) + + # min of existing upper and new upper, ignoring nan's + np.fmin(tighten_upper, self._nonant_upper_bounds, out=self._nonant_upper_bounds) + + # ceiling of lower bounds for integer variables + np.ceil(self._nonant_lower_bounds, out=self._nonant_lower_bounds, where=self._integer_variable_where) + # floor of upper bounds for integer variables + np.floor(self._nonant_upper_bounds, out=self._nonant_upper_bounds, where=self._integer_variable_where) + + self.put_send_buffer( + self.send_buffers[Field.NONANT_LOWER_BOUNDS], + Field.NONANT_LOWER_BOUNDS, + ) + self.put_send_buffer( + self.send_buffers[Field.NONANT_UPPER_BOUNDS], + Field.NONANT_UPPER_BOUNDS, + ) + + def update_nonant_bounds(self): + bounds_modified = 0 + send_buf = self.send_buffers[Field.NONANT_LOWER_BOUNDS] + for s in self.opt.local_scenarios.values(): + for ci, (ndn_i, xvar) in enumerate(s._mpisppy_data.nonant_indices.items()): + xvarlb = xvar.lb + if xvarlb is None: + xvarlb = -np.inf + if send_buf[ci] > xvarlb: + # global_toc(f"{self.__class__.__name__}: tightened {xvar.name} lower bound from {xvar.lb} to {send_buf[ci]}", self.cylinder_rank == 0) + xvar.lb = send_buf[ci] + bounds_modified += 1 + send_buf = self.send_buffers[Field.NONANT_UPPER_BOUNDS] + for s in self.opt.local_scenarios.values(): + for ci, (ndn_i, xvar) in enumerate(s._mpisppy_data.nonant_indices.items()): + xvarub = xvar.ub + if xvarub is None: + xvarub = np.inf + if send_buf[ci] < xvarub: + # global_toc(f"{self.__class__.__name__}: tightened {xvar.name} upper bound from {xvar.ub} to {send_buf[ci]}", self.cylinder_rank == 0) + xvar.ub = send_buf[ci] + bounds_modified += 1 + + bounds_modified /= len(self.opt.local_scenarios) + + if bounds_modified > 0: + global_toc(f"{self.__class__.__name__}: tightened {int(bounds_modified)} variable bounds", self.cylinder_rank == 0) + + def do_while_waiting_for_new_Ws(self, need_solution, warmstart=False): + # RC is an LP, should not need a warmstart with _value + super().do_while_waiting_for_new_Ws(need_solution=need_solution, warmstart=False) + # might as well see if a tighter upper bound has come along + self.extract_and_store_updated_nonant_bounds(new_dual=False) + self.update_nonant_bounds() + def main(self): # need the solution for ReducedCostsSpoke super().main(need_solution=True) + + +def _lb_generator(var_iterable): + for v in var_iterable: + lb = v.lb + if lb is None: + yield -np.inf + yield lb + + +def _ub_generator(var_iterable): + for v in var_iterable: + ub = v.ub + if ub is None: + yield np.inf + yield ub diff --git a/mpisppy/cylinders/spcommunicator.py b/mpisppy/cylinders/spcommunicator.py index 00be40ead..339059ec4 100644 --- a/mpisppy/cylinders/spcommunicator.py +++ b/mpisppy/cylinders/spcommunicator.py @@ -19,13 +19,17 @@ Separate hub and spoke classes for memory/window management? """ -import numpy as np import abc import time +import logging +import numpy as np +from math import inf -from mpisppy import MPI +from mpisppy import MPI, global_toc from mpisppy.cylinders.spwindow import Field, FieldLengths, SPWindow +logger = logging.getLogger(__name__) + def communicator_array(size): arr = np.empty(size+1, dtype='d') arr[:] = np.nan @@ -116,7 +120,8 @@ class SPCommunicator: or expects to receive from another SPCommunicator object. """ send_fields = () - receive_fields = () + receive_fields = (Field.OBJECTIVE_INNER_BOUND, Field.OBJECTIVE_OUTER_BOUND, + Field.NONANT_LOWER_BOUNDS, Field.NONANT_UPPER_BOUNDS,) def __init__(self, spbase_object, fullcomm, strata_comm, cylinder_comm, communicators, options=None): self.fullcomm = fullcomm @@ -138,7 +143,7 @@ def __init__(self, spbase_object, fullcomm, strata_comm, cylinder_comm, communic # Common fields for spokes and hubs self.receive_buffers = {} self.send_buffers = {} - # key: Field, value: list of (strata_rank, SPComm) with that Field + # key: Field, value: list of (strata_rank, SPComm, buffer) with that Field self.receive_field_spcomms = {} # setup FieldLengths which calculates @@ -152,6 +157,13 @@ def __init__(self, spbase_object, fullcomm, strata_comm, cylinder_comm, communic # the SPBase object self.opt.spcomm = self + # for communicating with bounders + self.latest_ib_char = None + self.latest_ob_char = None + self.last_ib_idx = None + self.last_ob_idx = None + self.initialize_bound_values() + return def _make_key(self, field: Field, origin: int): @@ -382,3 +394,104 @@ def get_receive_buffer(self, else: buf._is_new = False return False + + def receive_nonant_bounds(self): + """ receive the bounds on the nonanticipative variables based on + Field.NONANT_LOWER_BOUNDS and Field.NONANT_UPPER_BOUNDS. Updates the + NONANT_LOWER_BOUNDS and NONANT_UPPER_BOUNDS buffers, and if new, + updates the corresponding Pyomo nonant variables + """ + bounds_modified = 0 + for idx, _, recv_buf in self.receive_field_spcomms[Field.NONANT_LOWER_BOUNDS]: + is_new = self.get_receive_buffer(recv_buf, Field.NONANT_LOWER_BOUNDS, idx) + if not is_new: + continue + for s in self.opt.local_scenarios.values(): + for ci, (ndn_i, xvar) in enumerate(s._mpisppy_data.nonant_indices.items()): + xvarlb = xvar.lb + if xvarlb is None: + xvarlb = -inf + if recv_buf[ci] > xvarlb: + global_toc(f"{self.__class__.__name__}: tightened {xvar.name} lower bound from {xvar.lb} to {recv_buf[ci]}, value: {xvar.value}", self.cylinder_rank == 0) + xvar.lb = recv_buf[ci] + bounds_modified += 1 + for idx, _, recv_buf in self.receive_field_spcomms[Field.NONANT_UPPER_BOUNDS]: + is_new = self.get_receive_buffer(recv_buf, Field.NONANT_UPPER_BOUNDS, idx) + if not is_new: + continue + for s in self.opt.local_scenarios.values(): + for ci, (ndn_i, xvar) in enumerate(s._mpisppy_data.nonant_indices.items()): + xvarub = xvar.ub + if xvarub is None: + xvarub = inf + if recv_buf[ci] < xvarub: + global_toc(f"{self.__class__.__name__}: tightened {xvar.name} upper bound from {xvar.ub} to {recv_buf[ci]}, value: {xvar.value}", self.cylinder_rank == 0) + xvar.ub = recv_buf[ci] + bounds_modified += 1 + + bounds_modified /= len(self.opt.local_scenarios) + + if bounds_modified > 0: + global_toc(f"{self.__class__.__name__}: tightened {int(bounds_modified)} variable bounds", self.cylinder_rank == 0) + + def receive_innerbounds(self): + """ Get inner bounds from inner bound providers + """ + logger.debug(f"{self.__class__.__name__} is trying to receive from InnerBounds") + for idx, cls, recv_buf in self.receive_field_spcomms[Field.OBJECTIVE_INNER_BOUND]: + is_new = self.get_receive_buffer(recv_buf, Field.OBJECTIVE_INNER_BOUND, idx) + if is_new: + bound = recv_buf[0] + logger.debug(f"new InnerBound from {cls.__name__} in {self.__class__.__name__}, {bound=}") + self.BestInnerBound = self.InnerBoundUpdate(bound, cls, idx) + logger.debug(f"{self.__class__.__name__} back from InnerBounds") + + def receive_outerbounds(self): + """ Get outer bounds from outer bound providers + """ + logger.debug(f"{self.__class__.__name__} is trying to receive from OuterBounds") + for idx, cls, recv_buf in self.receive_field_spcomms[Field.OBJECTIVE_OUTER_BOUND]: + is_new = self.get_receive_buffer(recv_buf, Field.OBJECTIVE_OUTER_BOUND, idx) + if is_new: + bound = recv_buf[0] + logger.debug(f"new OuterBound from {cls.__name__} in {self.__class__.__name__}, {bound=}") + self.BestOuterBound = self.OuterBoundUpdate(bound, cls, idx) + logger.debug(f"{self.__class__.__name__} back from OuterBounds") + + def OuterBoundUpdate(self, new_bound, cls=None, idx=None, char='*'): + current_bound = self.BestOuterBound + if self._outer_bound_update(new_bound, current_bound): + if cls is None: + self.latest_ob_char = char + self.last_ob_idx = self.strata_rank + else: + self.latest_ob_char = cls.converger_spoke_char + self.last_ob_idx = idx + return new_bound + else: + return current_bound + + def InnerBoundUpdate(self, new_bound, cls=None, idx=None, char='*'): + current_bound = self.BestInnerBound + if self._inner_bound_update(new_bound, current_bound): + if cls is None: + self.latest_ib_char = char + self.last_ib_idx = self.strata_rank + else: + self.latest_ib_char = cls.converger_spoke_char + self.last_ib_idx = idx + return new_bound + else: + return current_bound + + def initialize_bound_values(self): + if self.opt.is_minimizing: + self.BestInnerBound = inf + self.BestOuterBound = -inf + self._inner_bound_update = lambda new, old : (new < old) + self._outer_bound_update = lambda new, old : (new > old) + else: + self.BestInnerBound = -inf + self.BestOuterBound = inf + self._inner_bound_update = lambda new, old : (new > old) + self._outer_bound_update = lambda new, old : (new < old) diff --git a/mpisppy/cylinders/spwindow.py b/mpisppy/cylinders/spwindow.py index b5e288778..3d8eb8e78 100644 --- a/mpisppy/cylinders/spwindow.py +++ b/mpisppy/cylinders/spwindow.py @@ -27,6 +27,8 @@ class Field(enum.IntEnum): SCENARIO_REDUCED_COST=201 CROSS_SCENARIO_CUT=300 CROSS_SCENARIO_COST=400 + NONANT_LOWER_BOUNDS=500 + NONANT_UPPER_BOUNDS=501 WHOLE=1_000_000 @@ -47,6 +49,8 @@ class Field(enum.IntEnum): Field.SCENARIO_REDUCED_COST : _field_length_components.local_nonant_length, Field.CROSS_SCENARIO_CUT : _field_length_components.total_number_scenarios * (_field_length_components.total_number_nonants + 1 + 1), Field.CROSS_SCENARIO_COST : _field_length_components.total_number_scenarios * _field_length_components.total_number_scenarios, + Field.NONANT_LOWER_BOUNDS : _field_length_components.total_number_nonants, + Field.NONANT_UPPER_BOUNDS : _field_length_components.total_number_nonants, } diff --git a/mpisppy/extensions/reduced_costs_fixer.py b/mpisppy/extensions/reduced_costs_fixer.py index 2724f84c8..86208d8e5 100644 --- a/mpisppy/extensions/reduced_costs_fixer.py +++ b/mpisppy/extensions/reduced_costs_fixer.py @@ -25,11 +25,9 @@ def __init__(self, spobj): self.verbose = ph_options['verbose'] or rc_options['verbose'] self.debug = rc_options['debug'] - self._use_rc_bt = rc_options['use_rc_bt'] # reduced costs less than this in absolute value # will be considered 0 self.zero_rc_tol = rc_options['zero_rc_tol'] - self._use_rc_fixer = rc_options['use_rc_fixer'] self._rc_fixer_require_improving_lagrangian = rc_options.get('rc_fixer_require_improving_lagrangian', True) # Percentage of variables which are at the bound we will target # to fix. We never fix varibles with reduced costs less than @@ -48,10 +46,6 @@ def __init__(self, spobj): # TODO: This should be same as in rc spoke? self.bound_tol = rc_options['rc_bound_tol'] - if not (self._use_rc_bt or self._use_rc_fixer) and \ - self.opt.cylinder_rank == 0: - print("Warning: ReducedCostsFixer will be idle. Enable use_rc_bt or use_rc_fixer in options.") - self._heuristic_fixed_vars = 0 if spobj.is_minimizing: self._best_outer_bound = -float("inf") @@ -60,6 +54,8 @@ def __init__(self, spobj): self._best_outer_bound = float("inf") self._outer_bound_update = lambda new, old : (new < old) + self._current_reduced_costs = None + def _update_best_outer_bound(self, new_outer_bound): if self._outer_bound_update(new_outer_bound, self._best_outer_bound): self._best_outer_bound = new_outer_bound @@ -80,7 +76,7 @@ def pre_iter0(self): def iter0_post_solver_creation(self): self.fix_fraction_target = self._fix_fraction_target_pre_iter0 - if self._use_rc_fixer and self.fix_fraction_target > 0: + if self.fix_fraction_target > 0: # wait for the reduced costs if self.opt.cylinder_rank == 0 and self.verbose: print("Fixing based on reduced costs prior to iteration 0!") @@ -139,122 +135,21 @@ def sync_with_spokes(self, pre_iter0 = False): cls=ReducedCostsSpoke, idx=self.reduced_costs_spoke_index, ) - if not pre_iter0 and self._use_rc_bt: - self.reduced_costs_bounds_tightening(reduced_costs, this_outer_bound) - ## End if - if self._use_rc_fixer and self.fix_fraction_target > 0.0: - if is_new_outer_bound or not self._rc_fixer_require_improving_lagrangian: - self.reduced_costs_fixing(reduced_costs) - ## End if - ## End if + if is_new_outer_bound or not self._rc_fixer_require_improving_lagrangian: + self._current_reduced_costs = np.array(reduced_costs[:]) else: if self.opt.cylinder_rank == 0 and self.verbose: print("No new reduced costs!") ## End if ## End if + if self.fix_fraction_target > 0.0 and self._current_reduced_costs is not None: + # makes sense to run this every iteration because xbar can change!! + self.reduced_costs_fixing(self._current_reduced_costs) + ## End if return - def reduced_costs_bounds_tightening(self, reduced_costs, this_outer_bound): - - bounds_reduced_this_iter = 0 - inner_bound = self.opt.spcomm.BestInnerBound - outer_bound = this_outer_bound - is_minimizing = self.opt.is_minimizing - if np.isinf(inner_bound) or np.isinf(outer_bound): - if self.opt.cylinder_rank == 0 and self.verbose: - print("Bounds tightened by reduced cost: 0 (inner or outer bound not available)") - return - - for sub in self.opt.local_subproblems.values(): - persistent_solver = is_persistent(sub._solver_plugin) - for sn in sub.scen_list: - tightened_this_scenario = 0 - s = self.opt.local_scenarios[sn] - for ci, (ndn_i, xvar) in enumerate(s._mpisppy_data.nonant_indices.items()): - if ndn_i in self._modeler_fixed_nonants: - continue - this_expected_rc = reduced_costs[ci] - update_var = False - if np.isnan(this_expected_rc) or np.isinf(this_expected_rc): - continue - - if np.isclose(xvar.lb, xvar.ub): - continue - - # TODO: could simplify if/else blocks using sign variable - might reduce readability? - # alternatively, could move some blocks into functions - if is_minimizing: - # var at lb - if this_expected_rc > 0 + self.zero_rc_tol: - new_ub = xvar.lb + (inner_bound - outer_bound)/ this_expected_rc - old_ub = xvar.ub - if new_ub < old_ub: - if ndn_i in self._integer_nonants: - new_ub = np.floor(new_ub) - xvar.setub(new_ub) - else: - xvar.setub(new_ub) - if self.debug and self.opt.cylinder_rank == 0: - print(f"tightening ub of var {xvar.name} to {new_ub} from {old_ub}; reduced cost is {this_expected_rc}") - update_var = True - bounds_reduced_this_iter += 1 - tightened_this_scenario += 1 - # var at ub - elif this_expected_rc < 0 - self.zero_rc_tol: - new_lb = xvar.ub + (inner_bound - outer_bound)/ this_expected_rc - old_lb = xvar.lb - if new_lb > old_lb: - if ndn_i in self._integer_nonants: - new_lb = np.ceil(new_lb) - xvar.setlb(new_lb) - else: - xvar.setlb(new_lb) - if self.debug and self.opt.cylinder_rank == 0: - print(f"tightening lb of var {xvar.name} to {new_lb} from {old_lb}; reduced cost is {this_expected_rc}") - update_var = True - bounds_reduced_this_iter += 1 - tightened_this_scenario += 1 - # maximization - else: - # var at lb - if this_expected_rc < 0 - self.zero_rc_tol: - new_ub = xvar.lb - (outer_bound - inner_bound)/ this_expected_rc - old_ub = xvar.ub - if new_ub < old_ub: - if ndn_i in self._integer_nonants: - new_ub = np.floor(new_ub) - xvar.setub(new_ub) - else: - xvar.setub(new_ub) - if self.debug and self.opt.cylinder_rank == 0: - print(f"tightening ub of var {xvar.name} to {new_ub} from {old_ub}; reduced cost is {this_expected_rc}") - update_var = True - bounds_reduced_this_iter += 1 - # var at ub - elif this_expected_rc > 0 + self.zero_rc_tol: - new_lb = xvar.ub - (outer_bound - inner_bound)/ this_expected_rc - old_lb = xvar.lb - if new_lb > old_lb: - if ndn_i in self._integer_nonants: - new_lb = np.ceil(new_lb) - xvar.setlb(new_lb) - else: - xvar.setlb(new_lb) - if self.debug and self.opt.cylinder_rank == 0: - print(f"tightening lb of var {xvar.name} to {new_lb} from {old_lb}; reduced cost is {this_expected_rc}") - update_var = True - bounds_reduced_this_iter += 1 - - if update_var and persistent_solver: - sub._solver_plugin.update_var(xvar) - - total_bounds_tightened = bounds_reduced_this_iter / len(self.opt.local_scenarios) - if self.opt.cylinder_rank == 0 and self.verbose: - print(f"Bounds tightened by reduced cost: {int(round(total_bounds_tightened))}/{self.nonant_length}") - - def reduced_costs_fixing(self, reduced_costs, pre_iter0 = False): if np.all(np.isnan(reduced_costs)): @@ -307,12 +202,20 @@ def reduced_costs_fixing(self, reduced_costs, pre_iter0 = False): print(f"unfixing var {xvar.name}; not converged in LP-LR") else: # not nan, variable is converged in LP-LR if xvar.fixed: + xb = s._mpisppy_model.xbars[ndn_i].value if (this_expected_rc <= target): xvar.unfix() update_var = True raw_fixed_this_iter -= 1 if self.debug and self.opt.cylinder_rank == 0: print(f"unfixing var {xvar.name}; reduced cost is zero/below target in LP-LR") + # in case somebody else unfixs a variable in another rank... + if abs(xb - xvar.value) > self.bound_tol: + xvar.unfix() + update_var = True + raw_fixed_this_iter -= 1 + if self.debug and self.opt.cylinder_rank == 0: + print(f"unfixing var {xvar.name}; xbar differs from the fixed value") else: xb = s._mpisppy_model.xbars[ndn_i].value if (this_expected_rc >= target): diff --git a/mpisppy/utils/cfg_vanilla.py b/mpisppy/utils/cfg_vanilla.py index 93084529f..60648ebc9 100644 --- a/mpisppy/utils/cfg_vanilla.py +++ b/mpisppy/utils/cfg_vanilla.py @@ -336,12 +336,10 @@ def add_reduced_costs_fixer(hub_dict, hub_dict["opt_kwargs"]["options"]["rc_options"] = { "verbose": cfg.rc_verbose, "debug": cfg.rc_debug, - "use_rc_fixer": cfg.rc_fixer, "zero_rc_tol": cfg.rc_zero_tol, "fix_fraction_target_pre_iter0": cfg.rc_fix_fraction_pre_iter0, "fix_fraction_target_iter0": cfg.rc_fix_fraction_iter0, "fix_fraction_target_iterK": cfg.rc_fix_fraction_iterk, - "use_rc_bt": cfg.rc_bound_tightening, "rc_bound_tol": cfg.rc_bound_tol, "rc_fixer_require_improving_lagrangian": cfg.rc_fixer_require_improving_lagrangian, } diff --git a/mpisppy/utils/config.py b/mpisppy/utils/config.py index c8e44f3fb..b60102723 100644 --- a/mpisppy/utils/config.py +++ b/mpisppy/utils/config.py @@ -635,11 +635,6 @@ def reduced_costs_args(self): domain=float, default=0.0) - self.add_to_config('rc_bound_tightening', - description="use reduced cost bound tightening", - domain=bool, - default=True) - self.add_to_config('rc_bound_tol', description="tol to consider vars at bound", domain=float,