Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/libecalc/domain/process/process_solver/asv_solvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
IndividualASVPressureControlStrategy,
IndividualASVRateControlStrategy,
)
from libecalc.domain.process.process_solver.pressure_control.upstream_choke import (
UpstreamChokePressureControlStrategy,
UpstreamChokeRunner,
)
from libecalc.domain.process.process_solver.search_strategies import BinarySearchStrategy, ScipyRootFindingStrategy
from libecalc.domain.process.process_solver.solver import Solution
from libecalc.domain.process.process_solver.solvers.recirculation_solver import (
Expand Down Expand Up @@ -49,6 +53,7 @@ def __init__(
fluid_service: FluidService,
individual_asv_control: bool = True,
constant_pressure_ratio: bool = False,
upstream_choke: Choke | None = None,
downstream_choke: Choke | None = None,
) -> None:
self._shaft = shaft
Expand All @@ -57,6 +62,7 @@ def __init__(
self._root_finding_strategy = ScipyRootFindingStrategy()
self._individual_asv_control = individual_asv_control
self._constant_pressure_ratio = constant_pressure_ratio
self._upstream_choke = upstream_choke
self._downstream_choke = downstream_choke
self._anti_surge_strategy: AntiSurgeStrategy
self._recirculation_loops = (
Expand Down Expand Up @@ -97,7 +103,16 @@ def __init__(
)

# 2) Pressure control strategy (downstream choke if present, else ASV-based)
if downstream_choke is not None:
if upstream_choke is not None and downstream_choke is not None:
raise ValueError("Only one of upstream_choke or downstream_choke can be set.")

if upstream_choke is not None:
pressure_control_system = SerialProcessSystem(propagators=[upstream_choke, *self._recirculation_loops])
self._pressure_control_strategy = UpstreamChokePressureControlStrategy(
runner=UpstreamChokeRunner(process_system=pressure_control_system, upstream_choke=upstream_choke),
root_finding_strategy=self._root_finding_strategy,
)
elif downstream_choke is not None:
pressure_control_system = SerialProcessSystem(propagators=[*self._recirculation_loops, downstream_choke])
self._pressure_control_strategy = DownstreamChokePressureControlStrategy(
runner=DownstreamChokeRunner(process_system=pressure_control_system, downstream_choke=downstream_choke)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@


class DownstreamChokeRunner:
def __init__(self, *, process_system: ProcessSystem, downstream_choke: Choke):
def __init__(self, process_system: ProcessSystem, downstream_choke: Choke):
self._process_system = process_system
self._downstream_choke = downstream_choke

def run(self, *, inlet_stream: FluidStream, downstream_delta_pressure: float) -> FluidStream:
def run(self, inlet_stream: FluidStream, downstream_delta_pressure: float) -> FluidStream:
self._downstream_choke.set_pressure_change(pressure_change=downstream_delta_pressure)
return self._process_system.propagate_stream(inlet_stream=inlet_stream)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from dataclasses import dataclass

from libecalc.domain.process.compressor.core.train.utils.common import PRESSURE_CALCULATION_TOLERANCE
from libecalc.domain.process.entities.process_units.choke import Choke
from libecalc.domain.process.process_solver.boundary import Boundary
from libecalc.domain.process.process_solver.float_constraint import FloatConstraint
from libecalc.domain.process.process_solver.pressure_control.pressure_control_strategy import PressureControlStrategy
from libecalc.domain.process.process_solver.search_strategies import RootFindingStrategy
from libecalc.domain.process.process_solver.solvers.downstream_choke_solver import ChokeConfiguration
from libecalc.domain.process.process_solver.solvers.upstream_choke_solver import UpstreamChokeSolver
from libecalc.domain.process.process_system.process_system import ProcessSystem
from libecalc.domain.process.value_objects.fluid_stream import FluidStream


@dataclass
class UpstreamChokeRunner:
def __init__(self, process_system: ProcessSystem, upstream_choke: Choke):
self._process_system = process_system
self._upstream_choke = upstream_choke

def run(self, inlet_stream: FluidStream, upstream_delta_pressure: float) -> FluidStream:
self._upstream_choke.set_pressure_change(pressure_change=upstream_delta_pressure)
return self._process_system.propagate_stream(inlet_stream=inlet_stream)


class UpstreamChokePressureControlStrategy(PressureControlStrategy):
"""
Meet target outlet pressure at fixed speed by applying upstream choking when needed.

The strategy models pressure control by reducing suction pressure (upstream choking).
It computes a safe ΔP search interval from the inlet stream pressure at solve time,
then uses a root-finding solver to find the ΔP that makes outlet pressure match the target.
"""

def __init__(
self,
runner: "UpstreamChokeRunner",
root_finding_strategy: RootFindingStrategy,
):
self._runner = runner
self._root_finding_strategy = root_finding_strategy

def apply(self, target_pressure: FloatConstraint, inlet_stream: FluidStream) -> bool:
# Use a small margin to avoid evaluating exactly at the physical/numerical extremes:
# ΔP = 0 (no choke) and ΔP = inlet_pressure (zero/negative suction pressure).
delta_pressure_boundary = Boundary(
min=PRESSURE_CALCULATION_TOLERANCE,
max=inlet_stream.pressure_bara - PRESSURE_CALCULATION_TOLERANCE,
)

solver = UpstreamChokeSolver(
root_finding_strategy=self._root_finding_strategy,
target_pressure=target_pressure.value,
delta_pressure_boundary=delta_pressure_boundary,
)

def choke_func(config: ChokeConfiguration) -> FluidStream:
# The runner is responsible for interpreting upstream ΔP as reduced suction pressure
# seen by the downstream process system.
return self._runner.run(
inlet_stream=inlet_stream,
upstream_delta_pressure=config.delta_pressure,
)

solution = solver.solve(choke_func)

# Re-run with the chosen configuration to leave the choke/process state configured.
outlet_stream_after_choke = choke_func(solution.configuration)

return outlet_stream_after_choke.pressure_bara == target_pressure
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ def solve(self, func: Callable[[ChokeConfiguration], FluidStream]) -> Solution[C
# Don't use choke if outlet pressure is below target
return Solution(success=True, configuration=choke_configuration)

# Evaluate outlet pressure at maximum allowed upstream ΔP (within boundary).
max_cfg = ChokeConfiguration(delta_pressure=self._delta_pressure_boundary.max)
outlet_at_max_choke = func(max_cfg)

if outlet_at_max_choke.pressure_bara > self._target_pressure:
# If we are still above target even at max choking, then no solution exists within the boundary.
return Solution(success=False, configuration=max_cfg)

pressure_change = self._root_finding_strategy.find_root(
boundary=self._delta_pressure_boundary,
func=lambda x: func(ChokeConfiguration(delta_pressure=x)).pressure_bara - self._target_pressure,
Expand Down
54 changes: 54 additions & 0 deletions tests/libecalc/domain/process/process_solver/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import pytest

from libecalc.domain.process.entities.shaft import VariableSpeedShaft
from libecalc.domain.process.process_solver.boundary import Boundary
from libecalc.domain.process.process_system.compressor_stage_process_unit import CompressorStageProcessUnit
from libecalc.domain.process.process_system.process_unit import ProcessUnitId, create_process_unit_id
from libecalc.domain.process.value_objects.fluid_stream import FluidService, FluidStream


class SpeedCompressorStage(CompressorStageProcessUnit):
"""
Test double that makes speed->pressure mapping deterministic:

outlet_pressure = inlet_pressure + shaft_speed

The capacity-related methods are implemented with wide limits to avoid
interfering with tests that focus on solver orchestration.
"""

def __init__(self, shaft: VariableSpeedShaft, fluid_service: FluidService):
self._id = create_process_unit_id()
self._shaft = shaft
self._fluid_service = fluid_service

def get_id(self) -> ProcessUnitId:
return self._id

def get_speed_boundary(self) -> Boundary:
return Boundary(min=200.0, max=600.0)

def get_maximum_standard_rate(self, inlet_stream: FluidStream) -> float:
# "Infinite" capacity for test purposes
return 1e30

def get_minimum_standard_rate(self, inlet_stream: FluidStream) -> float:
# "No minimum" for test purposes
return 0.0

def propagate_stream(self, inlet_stream: FluidStream) -> FluidStream:
speed = self._shaft.get_speed()
return self._fluid_service.create_stream_from_standard_rate(
fluid_model=inlet_stream.fluid_model,
pressure_bara=inlet_stream.pressure_bara + speed,
standard_rate_m3_per_day=inlet_stream.standard_rate_sm3_per_day,
temperature_kelvin=inlet_stream.temperature_kelvin,
)


@pytest.fixture
def speed_compressor_stage_factory(fluid_service):
def create(shaft: VariableSpeedShaft):
return SpeedCompressorStage(shaft=shaft, fluid_service=fluid_service)

return create
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from libecalc.domain.process.process_solver.float_constraint import FloatConstraint
from libecalc.domain.process.process_solver.pressure_control.upstream_choke import (
UpstreamChokePressureControlStrategy,
UpstreamChokeRunner,
)


def test_upstream_choke_strategy_baseline_below_target_does_not_choke(
simple_process_unit_factory,
process_system_factory,
stream_factory,
choke_factory,
root_finding_strategy,
):
"""
Does not apply upstream choking when baseline outlet pressure is already below the target.
"""
upstream_choke = choke_factory()
process_system = process_system_factory(
process_units=[upstream_choke, simple_process_unit_factory(pressure_multiplier=1)],
)

runner = UpstreamChokeRunner(process_system=process_system, upstream_choke=upstream_choke)
strategy = UpstreamChokePressureControlStrategy(
runner=runner,
root_finding_strategy=root_finding_strategy,
)

inlet_stream = stream_factory(standard_rate_m3_per_day=1000, pressure_bara=50)
target_pressure = FloatConstraint(70.0, abs_tol=1e-12)

success = strategy.apply(target_pressure=target_pressure, inlet_stream=inlet_stream)
assert success is False

# Ensure choke is not applied (baseline-run uses delta_pressure=0.0)
outlet_stream_no_choke = runner.run(inlet_stream=inlet_stream, upstream_delta_pressure=0.0)
assert outlet_stream_no_choke.pressure_bara < target_pressure


def test_upstream_choke_strategy_baseline_above_target_chokes_to_target(
simple_process_unit_factory,
process_system_factory,
stream_factory,
choke_factory,
root_finding_strategy,
):
"""Applies upstream choking when baseline outlet pressure is above the target, so outlet meets target."""
upstream_choke = choke_factory()
process_system = process_system_factory(
process_units=[upstream_choke, simple_process_unit_factory(pressure_multiplier=1)],
)

runner = UpstreamChokeRunner(process_system=process_system, upstream_choke=upstream_choke)
strategy = UpstreamChokePressureControlStrategy(
runner=runner,
root_finding_strategy=root_finding_strategy,
)

inlet_stream = stream_factory(standard_rate_m3_per_day=1000, pressure_bara=100)
target_pressure = FloatConstraint(70.0, abs_tol=1e-12)

success = strategy.apply(target_pressure=target_pressure, inlet_stream=inlet_stream)
assert success is True

# Confirm outlet is at target with the choke state set by the strategy.
outlet_stream_after = process_system.propagate_stream(inlet_stream=inlet_stream)
assert outlet_stream_after.pressure_bara == target_pressure
Original file line number Diff line number Diff line change
@@ -1,56 +1,14 @@
from libecalc.domain.process.entities.shaft import VariableSpeedShaft
from libecalc.domain.process.process_solver.asv_solvers import ASVSolver
from libecalc.domain.process.process_solver.boundary import Boundary
from libecalc.domain.process.process_solver.float_constraint import FloatConstraint
from libecalc.domain.process.process_system.compressor_stage_process_unit import CompressorStageProcessUnit
from libecalc.domain.process.process_system.process_unit import ProcessUnitId, create_process_unit_id
from libecalc.domain.process.value_objects.fluid_stream import FluidService, FluidStream


class SpeedCompressorStage(CompressorStageProcessUnit):
"""
Test double that makes speed->pressure mapping deterministic:

outlet_pressure = inlet_pressure + shaft_speed

The capacity-related methods are implemented with wide limits to avoid
interfering with tests that focus on solver orchestration.
"""

def __init__(self, shaft: VariableSpeedShaft, fluid_service: FluidService):
self._id = create_process_unit_id()
self._shaft = shaft
self._fluid_service = fluid_service

def get_id(self) -> ProcessUnitId:
return self._id

def get_speed_boundary(self) -> Boundary:
return Boundary(min=200.0, max=600.0)

def get_maximum_standard_rate(self, inlet_stream: FluidStream) -> float:
# "Infinite" capacity for test purposes
return 1e30

def get_minimum_standard_rate(self, inlet_stream: FluidStream) -> float:
# "No minimum" for test purposes
return 0.0

def propagate_stream(self, inlet_stream: FluidStream) -> FluidStream:
speed = self._shaft.get_speed()
return self._fluid_service.create_stream_from_standard_rate(
fluid_model=inlet_stream.fluid_model,
pressure_bara=inlet_stream.pressure_bara + speed,
standard_rate_m3_per_day=inlet_stream.standard_rate_sm3_per_day,
temperature_kelvin=inlet_stream.temperature_kelvin,
)


def test_asv_solver_applies_downstream_choke_when_speed_solution_is_at_min_speed(
stream_factory,
process_system_factory,
fluid_service,
choke_factory,
speed_compressor_stage_factory,
):
"""
If the target outlet pressure is lower than the outlet pressure at minimum speed, SpeedSolver returns
Expand All @@ -61,7 +19,7 @@ def test_asv_solver_applies_downstream_choke_when_speed_solution_is_at_min_speed
downstream_choke = choke_factory()

# One "compressor" stage that increases pressure with speed, followed by a downstream choke.
compressor = SpeedCompressorStage(shaft=shaft, fluid_service=fluid_service)
compressor = speed_compressor_stage_factory(shaft=shaft)

solver = ASVSolver(
shaft=shaft,
Expand All @@ -75,10 +33,10 @@ def test_asv_solver_applies_downstream_choke_when_speed_solution_is_at_min_speed

# At min speed=200 => baseline outlet = 25 + 200 = 225.
# Choose a target lower than 225 so SpeedSolver returns min speed with success=False.
target = FloatConstraint(50.0, abs_tol=1e-12)
target_pressure = FloatConstraint(50.0, abs_tol=1e-12)

speed_solution, recirculation_solutions = solver.find_asv_solution(
pressure_constraint=target,
pressure_constraint=target_pressure,
inlet_stream=inlet_stream,
)

Expand All @@ -96,4 +54,4 @@ def test_asv_solver_applies_downstream_choke_when_speed_solution_is_at_min_speed
process_system = process_system_factory(process_units=[*solver.get_recirculation_loops(), downstream_choke])
outlet = process_system.propagate_stream(inlet_stream=inlet_stream)

assert outlet.pressure_bara == target
assert outlet.pressure_bara == target_pressure
Loading
Loading