From 24d66759406ef6426a4c9915e0bf7b28a14aca55 Mon Sep 17 00:00:00 2001 From: Frode Helgetun Krogh <70878501+frodehk@users.noreply.github.com> Date: Thu, 12 Mar 2026 15:08:10 +0100 Subject: [PATCH 1/2] refactor: introduce upstream choke pressure control strategy --- .../process/process_solver/asv_solvers.py | 19 ++++- .../pressure_control/downstream_choke.py | 4 +- .../pressure_control/upstream_choke.py | 70 +++++++++++++++++++ .../solvers/upstream_choke_solver.py | 8 +++ .../domain/process/process_solver/conftest.py | 54 ++++++++++++++ ...pstream_choke_pressure_control_strategy.py | 67 ++++++++++++++++++ .../test_asv_solver_with_downstream_choke.py | 52 ++------------ .../test_asv_solver_with_upstream_choke.py | 55 +++++++++++++++ 8 files changed, 278 insertions(+), 51 deletions(-) create mode 100644 src/libecalc/domain/process/process_solver/pressure_control/upstream_choke.py create mode 100644 tests/libecalc/domain/process/process_solver/conftest.py create mode 100644 tests/libecalc/domain/process/process_solver/pressure_control/test_upstream_choke_pressure_control_strategy.py create mode 100644 tests/libecalc/domain/process/process_solver/test_asv_solver_with_upstream_choke.py diff --git a/src/libecalc/domain/process/process_solver/asv_solvers.py b/src/libecalc/domain/process/process_solver/asv_solvers.py index ccb26a81d6..4b3454f4be 100644 --- a/src/libecalc/domain/process/process_solver/asv_solvers.py +++ b/src/libecalc/domain/process/process_solver/asv_solvers.py @@ -17,6 +17,10 @@ IndividualASVPressureControlStrategy, IndividualASVRateControlStrategy, ) +from libecalc.domain.process.process_solver.pressure_control.upstream_choke import ( + UpstreamChokePressureControlStrategy, + UpstreamChokeRunner, +) from libecalc.domain.process.process_solver.search_strategies import BinarySearchStrategy, ScipyRootFindingStrategy from libecalc.domain.process.process_solver.solver import Solution from libecalc.domain.process.process_solver.solvers.recirculation_solver import ( @@ -49,6 +53,7 @@ def __init__( fluid_service: FluidService, individual_asv_control: bool = True, constant_pressure_ratio: bool = False, + upstream_choke: Choke | None = None, downstream_choke: Choke | None = None, ) -> None: self._shaft = shaft @@ -57,6 +62,7 @@ def __init__( self._root_finding_strategy = ScipyRootFindingStrategy() self._individual_asv_control = individual_asv_control self._constant_pressure_ratio = constant_pressure_ratio + self._upstream_choke = upstream_choke self._downstream_choke = downstream_choke self._anti_surge_strategy: AntiSurgeStrategy self._recirculation_loops = ( @@ -97,8 +103,17 @@ def __init__( ) # 2) Pressure control strategy (downstream choke if present, else ASV-based) - if downstream_choke is not None: - pressure_control_system = SerialProcessSystem(propagators=[*self._recirculation_loops, downstream_choke]) + if upstream_choke is not None and downstream_choke is not None: + raise ValueError("Only one of upstream_choke or downstream_choke can be set.") + + if upstream_choke is not None: + pressure_control_system = ProcessSystem(process_units=[upstream_choke, *self._recirculation_loops]) + self._pressure_control_strategy = UpstreamChokePressureControlStrategy( + runner=UpstreamChokeRunner(process_system=pressure_control_system, upstream_choke=upstream_choke), + root_finding_strategy=self._root_finding_strategy, + ) + elif downstream_choke is not None: + pressure_control_system = ProcessSystem(process_units=[*self._recirculation_loops, downstream_choke]) self._pressure_control_strategy = DownstreamChokePressureControlStrategy( runner=DownstreamChokeRunner(process_system=pressure_control_system, downstream_choke=downstream_choke) ) diff --git a/src/libecalc/domain/process/process_solver/pressure_control/downstream_choke.py b/src/libecalc/domain/process/process_solver/pressure_control/downstream_choke.py index 27fefc8055..4237d4058f 100644 --- a/src/libecalc/domain/process/process_solver/pressure_control/downstream_choke.py +++ b/src/libecalc/domain/process/process_solver/pressure_control/downstream_choke.py @@ -10,11 +10,11 @@ class DownstreamChokeRunner: - def __init__(self, *, process_system: ProcessSystem, downstream_choke: Choke): + def __init__(self, process_system: ProcessSystem, downstream_choke: Choke): self._process_system = process_system self._downstream_choke = downstream_choke - def run(self, *, inlet_stream: FluidStream, downstream_delta_pressure: float) -> FluidStream: + def run(self, inlet_stream: FluidStream, downstream_delta_pressure: float) -> FluidStream: self._downstream_choke.set_pressure_change(pressure_change=downstream_delta_pressure) return self._process_system.propagate_stream(inlet_stream=inlet_stream) diff --git a/src/libecalc/domain/process/process_solver/pressure_control/upstream_choke.py b/src/libecalc/domain/process/process_solver/pressure_control/upstream_choke.py new file mode 100644 index 0000000000..f9fa34e504 --- /dev/null +++ b/src/libecalc/domain/process/process_solver/pressure_control/upstream_choke.py @@ -0,0 +1,70 @@ +from dataclasses import dataclass + +from libecalc.domain.process.compressor.core.train.utils.common import PRESSURE_CALCULATION_TOLERANCE +from libecalc.domain.process.entities.process_units.choke import Choke +from libecalc.domain.process.process_solver.boundary import Boundary +from libecalc.domain.process.process_solver.float_constraint import FloatConstraint +from libecalc.domain.process.process_solver.pressure_control.pressure_control_strategy import PressureControlStrategy +from libecalc.domain.process.process_solver.search_strategies import RootFindingStrategy +from libecalc.domain.process.process_solver.solvers.downstream_choke_solver import ChokeConfiguration +from libecalc.domain.process.process_solver.solvers.upstream_choke_solver import UpstreamChokeSolver +from libecalc.domain.process.process_system.process_system import ProcessSystem +from libecalc.domain.process.value_objects.fluid_stream import FluidStream + + +@dataclass +class UpstreamChokeRunner: + def __init__(self, process_system: ProcessSystem, upstream_choke: Choke): + self._process_system = process_system + self._upstream_choke = upstream_choke + + def run(self, inlet_stream: FluidStream, upstream_delta_pressure: float) -> FluidStream: + self._upstream_choke.set_pressure_change(pressure_change=upstream_delta_pressure) + return self._process_system.propagate_stream(inlet_stream=inlet_stream) + + +class UpstreamChokePressureControlStrategy(PressureControlStrategy): + """ + Meet target outlet pressure at fixed speed by applying upstream choking when needed. + + The strategy models pressure control by reducing suction pressure (upstream choking). + It computes a safe ΔP search interval from the inlet stream pressure at solve time, + then uses a root-finding solver to find the ΔP that makes outlet pressure match the target. + """ + + def __init__( + self, + runner: "UpstreamChokeRunner", + root_finding_strategy: RootFindingStrategy, + ): + self._runner = runner + self._root_finding_strategy = root_finding_strategy + + def apply(self, target_pressure: FloatConstraint, inlet_stream: FluidStream) -> bool: + # Use a small margin to avoid evaluating exactly at the physical/numerical extremes: + # ΔP = 0 (no choke) and ΔP = inlet_pressure (zero/negative suction pressure). + delta_pressure_boundary = Boundary( + min=PRESSURE_CALCULATION_TOLERANCE, + max=inlet_stream.pressure_bara - PRESSURE_CALCULATION_TOLERANCE, + ) + + solver = UpstreamChokeSolver( + root_finding_strategy=self._root_finding_strategy, + target_pressure=target_pressure.value, + delta_pressure_boundary=delta_pressure_boundary, + ) + + def choke_func(config: ChokeConfiguration) -> FluidStream: + # The runner is responsible for interpreting upstream ΔP as reduced suction pressure + # seen by the downstream process system. + return self._runner.run( + inlet_stream=inlet_stream, + upstream_delta_pressure=config.delta_pressure, + ) + + solution = solver.solve(choke_func) + + # Re-run with the chosen configuration to leave the choke/process state configured. + outlet_stream_after_choke = choke_func(solution.configuration) + + return outlet_stream_after_choke.pressure_bara == target_pressure diff --git a/src/libecalc/domain/process/process_solver/solvers/upstream_choke_solver.py b/src/libecalc/domain/process/process_solver/solvers/upstream_choke_solver.py index 382b65be5e..bbc63b40d9 100644 --- a/src/libecalc/domain/process/process_solver/solvers/upstream_choke_solver.py +++ b/src/libecalc/domain/process/process_solver/solvers/upstream_choke_solver.py @@ -26,6 +26,14 @@ def solve(self, func: Callable[[ChokeConfiguration], FluidStream]) -> Solution[C # Don't use choke if outlet pressure is below target return Solution(success=True, configuration=choke_configuration) + # Evaluate outlet pressure at maximum allowed upstream ΔP (within boundary). + max_cfg = ChokeConfiguration(delta_pressure=self._delta_pressure_boundary.max) + outlet_at_max_choke = func(max_cfg) + + if outlet_at_max_choke.pressure_bara > self._target_pressure: + # If we are still above target even at max choking, then no solution exists within the boundary. + return Solution(success=False, configuration=max_cfg) + pressure_change = self._root_finding_strategy.find_root( boundary=self._delta_pressure_boundary, func=lambda x: func(ChokeConfiguration(delta_pressure=x)).pressure_bara - self._target_pressure, diff --git a/tests/libecalc/domain/process/process_solver/conftest.py b/tests/libecalc/domain/process/process_solver/conftest.py new file mode 100644 index 0000000000..e9a36290b6 --- /dev/null +++ b/tests/libecalc/domain/process/process_solver/conftest.py @@ -0,0 +1,54 @@ +import pytest + +from libecalc.domain.process.entities.shaft import VariableSpeedShaft +from libecalc.domain.process.process_solver.boundary import Boundary +from libecalc.domain.process.process_system.compressor_stage_process_unit import CompressorStageProcessUnit +from libecalc.domain.process.process_system.process_unit import ProcessUnitId, create_process_unit_id +from libecalc.domain.process.value_objects.fluid_stream import FluidService, FluidStream + + +class SpeedCompressorStage(CompressorStageProcessUnit): + """ + Test double that makes speed->pressure mapping deterministic: + + outlet_pressure = inlet_pressure + shaft_speed + + The capacity-related methods are implemented with wide limits to avoid + interfering with tests that focus on solver orchestration. + """ + + def __init__(self, shaft: VariableSpeedShaft, fluid_service: FluidService): + self._id = create_process_unit_id() + self._shaft = shaft + self._fluid_service = fluid_service + + def get_id(self) -> ProcessUnitId: + return self._id + + def get_speed_boundary(self) -> Boundary: + return Boundary(min=200.0, max=600.0) + + def get_maximum_standard_rate(self, inlet_stream: FluidStream) -> float: + # "Infinite" capacity for test purposes + return 1e30 + + def get_minimum_standard_rate(self, inlet_stream: FluidStream) -> float: + # "No minimum" for test purposes + return 0.0 + + def propagate_stream(self, inlet_stream: FluidStream) -> FluidStream: + speed = self._shaft.get_speed() + return self._fluid_service.create_stream_from_standard_rate( + fluid_model=inlet_stream.fluid_model, + pressure_bara=inlet_stream.pressure_bara + speed, + standard_rate_m3_per_day=inlet_stream.standard_rate_sm3_per_day, + temperature_kelvin=inlet_stream.temperature_kelvin, + ) + + +@pytest.fixture +def speed_compressor_stage_factory(fluid_service): + def create(shaft: VariableSpeedShaft): + return SpeedCompressorStage(shaft=shaft, fluid_service=fluid_service) + + return create diff --git a/tests/libecalc/domain/process/process_solver/pressure_control/test_upstream_choke_pressure_control_strategy.py b/tests/libecalc/domain/process/process_solver/pressure_control/test_upstream_choke_pressure_control_strategy.py new file mode 100644 index 0000000000..f5c60a42ad --- /dev/null +++ b/tests/libecalc/domain/process/process_solver/pressure_control/test_upstream_choke_pressure_control_strategy.py @@ -0,0 +1,67 @@ +from libecalc.domain.process.process_solver.float_constraint import FloatConstraint +from libecalc.domain.process.process_solver.pressure_control.upstream_choke import ( + UpstreamChokePressureControlStrategy, + UpstreamChokeRunner, +) + + +def test_upstream_choke_strategy_baseline_below_target_does_not_choke( + simple_process_unit_factory, + process_system_factory, + stream_factory, + choke_factory, + root_finding_strategy, +): + """ + Does not apply upstream choking when baseline outlet pressure is already below the target. + """ + upstream_choke = choke_factory() + process_system = process_system_factory( + process_units=[upstream_choke, simple_process_unit_factory(pressure_multiplier=1)], + ) + + runner = UpstreamChokeRunner(process_system=process_system, upstream_choke=upstream_choke) + strategy = UpstreamChokePressureControlStrategy( + runner=runner, + root_finding_strategy=root_finding_strategy, + ) + + inlet_stream = stream_factory(standard_rate_m3_per_day=1000, pressure_bara=50) + target_pressure = FloatConstraint(70.0, abs_tol=1e-12) + + success = strategy.apply(target_pressure=target_pressure, inlet_stream=inlet_stream) + assert success is False + + # Ensure choke is not applied (baseline-run uses delta_pressure=0.0) + outlet_stream_no_choke = runner.run(inlet_stream=inlet_stream, upstream_delta_pressure=0.0) + assert outlet_stream_no_choke.pressure_bara < target_pressure + + +def test_upstream_choke_strategy_baseline_above_target_chokes_to_target( + simple_process_unit_factory, + process_system_factory, + stream_factory, + choke_factory, + root_finding_strategy, +): + """Applies upstream choking when baseline outlet pressure is above the target, so outlet meets target.""" + upstream_choke = choke_factory() + process_system = process_system_factory( + process_units=[upstream_choke, simple_process_unit_factory(pressure_multiplier=1)], + ) + + runner = UpstreamChokeRunner(process_system=process_system, upstream_choke=upstream_choke) + strategy = UpstreamChokePressureControlStrategy( + runner=runner, + root_finding_strategy=root_finding_strategy, + ) + + inlet_stream = stream_factory(standard_rate_m3_per_day=1000, pressure_bara=100) + target_pressure = FloatConstraint(70.0, abs_tol=1e-12) + + success = strategy.apply(target_pressure=target_pressure, inlet_stream=inlet_stream) + assert success is True + + # Confirm outlet is at target with the choke state set by the strategy. + outlet_stream_after = process_system.propagate_stream(inlet_stream=inlet_stream) + assert outlet_stream_after.pressure_bara == target_pressure diff --git a/tests/libecalc/domain/process/process_solver/test_asv_solver_with_downstream_choke.py b/tests/libecalc/domain/process/process_solver/test_asv_solver_with_downstream_choke.py index 2ea3997c12..7489578ef3 100644 --- a/tests/libecalc/domain/process/process_solver/test_asv_solver_with_downstream_choke.py +++ b/tests/libecalc/domain/process/process_solver/test_asv_solver_with_downstream_choke.py @@ -1,49 +1,6 @@ from libecalc.domain.process.entities.shaft import VariableSpeedShaft from libecalc.domain.process.process_solver.asv_solvers import ASVSolver -from libecalc.domain.process.process_solver.boundary import Boundary from libecalc.domain.process.process_solver.float_constraint import FloatConstraint -from libecalc.domain.process.process_system.compressor_stage_process_unit import CompressorStageProcessUnit -from libecalc.domain.process.process_system.process_unit import ProcessUnitId, create_process_unit_id -from libecalc.domain.process.value_objects.fluid_stream import FluidService, FluidStream - - -class SpeedCompressorStage(CompressorStageProcessUnit): - """ - Test double that makes speed->pressure mapping deterministic: - - outlet_pressure = inlet_pressure + shaft_speed - - The capacity-related methods are implemented with wide limits to avoid - interfering with tests that focus on solver orchestration. - """ - - def __init__(self, shaft: VariableSpeedShaft, fluid_service: FluidService): - self._id = create_process_unit_id() - self._shaft = shaft - self._fluid_service = fluid_service - - def get_id(self) -> ProcessUnitId: - return self._id - - def get_speed_boundary(self) -> Boundary: - return Boundary(min=200.0, max=600.0) - - def get_maximum_standard_rate(self, inlet_stream: FluidStream) -> float: - # "Infinite" capacity for test purposes - return 1e30 - - def get_minimum_standard_rate(self, inlet_stream: FluidStream) -> float: - # "No minimum" for test purposes - return 0.0 - - def propagate_stream(self, inlet_stream: FluidStream) -> FluidStream: - speed = self._shaft.get_speed() - return self._fluid_service.create_stream_from_standard_rate( - fluid_model=inlet_stream.fluid_model, - pressure_bara=inlet_stream.pressure_bara + speed, - standard_rate_m3_per_day=inlet_stream.standard_rate_sm3_per_day, - temperature_kelvin=inlet_stream.temperature_kelvin, - ) def test_asv_solver_applies_downstream_choke_when_speed_solution_is_at_min_speed( @@ -51,6 +8,7 @@ def test_asv_solver_applies_downstream_choke_when_speed_solution_is_at_min_speed process_system_factory, fluid_service, choke_factory, + speed_compressor_stage_factory, ): """ If the target outlet pressure is lower than the outlet pressure at minimum speed, SpeedSolver returns @@ -61,7 +19,7 @@ def test_asv_solver_applies_downstream_choke_when_speed_solution_is_at_min_speed downstream_choke = choke_factory() # One "compressor" stage that increases pressure with speed, followed by a downstream choke. - compressor = SpeedCompressorStage(shaft=shaft, fluid_service=fluid_service) + compressor = speed_compressor_stage_factory(shaft=shaft) solver = ASVSolver( shaft=shaft, @@ -75,10 +33,10 @@ def test_asv_solver_applies_downstream_choke_when_speed_solution_is_at_min_speed # At min speed=200 => baseline outlet = 25 + 200 = 225. # Choose a target lower than 225 so SpeedSolver returns min speed with success=False. - target = FloatConstraint(50.0, abs_tol=1e-12) + target_pressure = FloatConstraint(50.0, abs_tol=1e-12) speed_solution, recirculation_solutions = solver.find_asv_solution( - pressure_constraint=target, + pressure_constraint=target_pressure, inlet_stream=inlet_stream, ) @@ -96,4 +54,4 @@ def test_asv_solver_applies_downstream_choke_when_speed_solution_is_at_min_speed process_system = process_system_factory(process_units=[*solver.get_recirculation_loops(), downstream_choke]) outlet = process_system.propagate_stream(inlet_stream=inlet_stream) - assert outlet.pressure_bara == target + assert outlet.pressure_bara == target_pressure diff --git a/tests/libecalc/domain/process/process_solver/test_asv_solver_with_upstream_choke.py b/tests/libecalc/domain/process/process_solver/test_asv_solver_with_upstream_choke.py new file mode 100644 index 0000000000..237f358a37 --- /dev/null +++ b/tests/libecalc/domain/process/process_solver/test_asv_solver_with_upstream_choke.py @@ -0,0 +1,55 @@ +from libecalc.domain.process.entities.shaft import VariableSpeedShaft +from libecalc.domain.process.process_solver.asv_solvers import ASVSolver +from libecalc.domain.process.process_solver.float_constraint import FloatConstraint + + +def test_asv_solver_applies_upstream_choke_when_speed_solution_is_at_min_speed( + stream_factory, + process_system_factory, + fluid_service, + choke_factory, + speed_compressor_stage_factory, +): + """ + If the outlet pressure is higher than the target outlet pressure at minimum speed, SpeedSolver returns + success=False and selects the minimum speed. ASVSolver should still attempt pressure control, and with an + upstream choke it should be able to meet the target. + """ + shaft = VariableSpeedShaft() + upstream_choke = choke_factory() + + # One "compressor" stage that increases pressure with speed, with an upstream choke. + compressor = speed_compressor_stage_factory(shaft=shaft) + + solver = ASVSolver( + shaft=shaft, + compressors=[compressor], + fluid_service=fluid_service, + individual_asv_control=False, + upstream_choke=upstream_choke, + ) + + inlet_stream = stream_factory(standard_rate_m3_per_day=1000, pressure_bara=25.0) + + # At min speed=200 => baseline outlet pressure = 25 + 200 = 225. + # Choose a target lower than 225 (outlet > target) so SpeedSolver returns min speed with success=False. + target_pressure = FloatConstraint(210, abs_tol=1e-12) + + speed_solution, recirculation_solutions = solver.find_asv_solution( + pressure_constraint=target_pressure, + inlet_stream=inlet_stream, + ) + + # SpeedSolver could not meet the target pressure within the speed boundary, + # so it returned the minimum speed as the best feasible speed. + assert speed_solution.success is False + assert speed_solution.configuration.speed == compressor.get_speed_boundary().min + + # But overall solver should succeed via upstream choke pressure control. + assert recirculation_solutions[0].success is True + + # Verify that upstream choking actually brings outlet down to target. + process_system = process_system_factory(process_units=[upstream_choke, *solver.get_recirculation_loops()]) + outlet = process_system.propagate_stream(inlet_stream=inlet_stream) + + assert outlet.pressure_bara == target_pressure From 388a137e9b32320663e01e21a19406becd2be1b6 Mon Sep 17 00:00:00 2001 From: Frode Helgetun Krogh <70878501+frodehk@users.noreply.github.com> Date: Fri, 13 Mar 2026 10:39:45 +0100 Subject: [PATCH 2/2] chore: rebase --- src/libecalc/domain/process/process_solver/asv_solvers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/libecalc/domain/process/process_solver/asv_solvers.py b/src/libecalc/domain/process/process_solver/asv_solvers.py index 4b3454f4be..535c6584d9 100644 --- a/src/libecalc/domain/process/process_solver/asv_solvers.py +++ b/src/libecalc/domain/process/process_solver/asv_solvers.py @@ -107,13 +107,13 @@ def __init__( raise ValueError("Only one of upstream_choke or downstream_choke can be set.") if upstream_choke is not None: - pressure_control_system = ProcessSystem(process_units=[upstream_choke, *self._recirculation_loops]) + pressure_control_system = SerialProcessSystem(propagators=[upstream_choke, *self._recirculation_loops]) self._pressure_control_strategy = UpstreamChokePressureControlStrategy( runner=UpstreamChokeRunner(process_system=pressure_control_system, upstream_choke=upstream_choke), root_finding_strategy=self._root_finding_strategy, ) elif downstream_choke is not None: - pressure_control_system = ProcessSystem(process_units=[*self._recirculation_loops, downstream_choke]) + pressure_control_system = SerialProcessSystem(propagators=[*self._recirculation_loops, downstream_choke]) self._pressure_control_strategy = DownstreamChokePressureControlStrategy( runner=DownstreamChokeRunner(process_system=pressure_control_system, downstream_choke=downstream_choke) )