Skip to content

Commit 6420b79

Browse files
committed
refactor: introduce upstream choke pressure control strategy
1 parent 4c404c2 commit 6420b79

8 files changed

Lines changed: 277 additions & 50 deletions

File tree

src/libecalc/domain/process/process_solver/asv_solvers.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717
IndividualASVPressureControlStrategy,
1818
IndividualASVRateControlStrategy,
1919
)
20+
from libecalc.domain.process.process_solver.pressure_control.upstream_choke import (
21+
UpstreamChokePressureControlStrategy,
22+
UpstreamChokeRunner,
23+
)
2024
from libecalc.domain.process.process_solver.search_strategies import BinarySearchStrategy, ScipyRootFindingStrategy
2125
from libecalc.domain.process.process_solver.solver import Solution
2226
from libecalc.domain.process.process_solver.solvers.recirculation_solver import (
@@ -49,6 +53,7 @@ def __init__(
4953
fluid_service: FluidService,
5054
individual_asv_control: bool = True,
5155
constant_pressure_ratio: bool = False,
56+
upstream_choke: Choke | None = None,
5257
downstream_choke: Choke | None = None,
5358
) -> None:
5459
self._shaft = shaft
@@ -57,6 +62,7 @@ def __init__(
5762
self._root_finding_strategy = ScipyRootFindingStrategy()
5863
self._individual_asv_control = individual_asv_control
5964
self._constant_pressure_ratio = constant_pressure_ratio
65+
self._upstream_choke = upstream_choke
6066
self._downstream_choke = downstream_choke
6167
self._anti_surge_strategy: AntiSurgeStrategy
6268
self._recirculation_loops = (
@@ -97,7 +103,16 @@ def __init__(
97103
)
98104

99105
# 2) Pressure control strategy (downstream choke if present, else ASV-based)
100-
if downstream_choke is not None:
106+
if upstream_choke is not None and downstream_choke is not None:
107+
raise ValueError("Only one of upstream_choke or downstream_choke can be set.")
108+
109+
if upstream_choke is not None:
110+
pressure_control_system = ProcessSystem(process_units=[upstream_choke, *self._recirculation_loops])
111+
self._pressure_control_strategy = UpstreamChokePressureControlStrategy(
112+
runner=UpstreamChokeRunner(process_system=pressure_control_system, upstream_choke=upstream_choke),
113+
root_finding_strategy=self._root_finding_strategy,
114+
)
115+
elif downstream_choke is not None:
101116
pressure_control_system = ProcessSystem(process_units=[*self._recirculation_loops, downstream_choke])
102117
self._pressure_control_strategy = DownstreamChokePressureControlStrategy(
103118
runner=DownstreamChokeRunner(process_system=pressure_control_system, downstream_choke=downstream_choke)

src/libecalc/domain/process/process_solver/pressure_control/downstream_choke.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@
1010

1111

1212
class DownstreamChokeRunner:
13-
def __init__(self, *, process_system: ProcessSystem, downstream_choke: Choke):
13+
def __init__(self, process_system: ProcessSystem, downstream_choke: Choke):
1414
self._process_system = process_system
1515
self._downstream_choke = downstream_choke
1616

17-
def run(self, *, inlet_stream: FluidStream, downstream_delta_pressure: float) -> FluidStream:
17+
def run(self, inlet_stream: FluidStream, downstream_delta_pressure: float) -> FluidStream:
1818
self._downstream_choke.set_pressure_change(pressure_change=downstream_delta_pressure)
1919
return self._process_system.propagate_stream(inlet_stream=inlet_stream)
2020

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
from dataclasses import dataclass
2+
3+
from libecalc.domain.process.compressor.core.train.utils.common import PRESSURE_CALCULATION_TOLERANCE
4+
from libecalc.domain.process.entities.process_units.choke import Choke
5+
from libecalc.domain.process.process_solver.boundary import Boundary
6+
from libecalc.domain.process.process_solver.float_constraint import FloatConstraint
7+
from libecalc.domain.process.process_solver.pressure_control.pressure_control_strategy import PressureControlStrategy
8+
from libecalc.domain.process.process_solver.search_strategies import RootFindingStrategy
9+
from libecalc.domain.process.process_solver.solvers.downstream_choke_solver import ChokeConfiguration
10+
from libecalc.domain.process.process_solver.solvers.upstream_choke_solver import UpstreamChokeSolver
11+
from libecalc.domain.process.process_system.process_system import ProcessSystem
12+
from libecalc.domain.process.value_objects.fluid_stream import FluidStream
13+
14+
15+
@dataclass
16+
class UpstreamChokeRunner:
17+
def __init__(self, process_system: ProcessSystem, upstream_choke: Choke):
18+
self._process_system = process_system
19+
self._upstream_choke = upstream_choke
20+
21+
def run(self, inlet_stream: FluidStream, upstream_delta_pressure: float) -> FluidStream:
22+
self._upstream_choke.set_pressure_change(pressure_change=upstream_delta_pressure)
23+
return self._process_system.propagate_stream(inlet_stream=inlet_stream)
24+
25+
26+
class UpstreamChokePressureControlStrategy(PressureControlStrategy):
27+
"""
28+
Meet target outlet pressure at fixed speed by applying upstream choking when needed.
29+
30+
The strategy models pressure control by reducing suction pressure (upstream choking).
31+
It computes a safe ΔP search interval from the inlet stream pressure at solve time,
32+
then uses a root-finding solver to find the ΔP that makes outlet pressure match the target.
33+
"""
34+
35+
def __init__(
36+
self,
37+
runner: "UpstreamChokeRunner",
38+
root_finding_strategy: RootFindingStrategy,
39+
):
40+
self._runner = runner
41+
self._root_finding_strategy = root_finding_strategy
42+
43+
def apply(self, target_pressure: FloatConstraint, inlet_stream: FluidStream) -> bool:
44+
# Use a small margin to avoid evaluating exactly at the physical/numerical extremes:
45+
# ΔP = 0 (no choke) and ΔP = inlet_pressure (zero/negative suction pressure).
46+
delta_pressure_boundary = Boundary(
47+
min=PRESSURE_CALCULATION_TOLERANCE,
48+
max=inlet_stream.pressure_bara - PRESSURE_CALCULATION_TOLERANCE,
49+
)
50+
51+
solver = UpstreamChokeSolver(
52+
root_finding_strategy=self._root_finding_strategy,
53+
target_pressure=target_pressure.value,
54+
delta_pressure_boundary=delta_pressure_boundary,
55+
)
56+
57+
def choke_func(config: ChokeConfiguration) -> FluidStream:
58+
# The runner is responsible for interpreting upstream ΔP as reduced suction pressure
59+
# seen by the downstream process system.
60+
return self._runner.run(
61+
inlet_stream=inlet_stream,
62+
upstream_delta_pressure=config.delta_pressure,
63+
)
64+
65+
solution = solver.solve(choke_func)
66+
67+
# Re-run with the chosen configuration to leave the choke/process state configured.
68+
outlet_stream_after_choke = choke_func(solution.configuration)
69+
70+
return outlet_stream_after_choke.pressure_bara == target_pressure

src/libecalc/domain/process/process_solver/solvers/upstream_choke_solver.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,14 @@ def solve(self, func: Callable[[ChokeConfiguration], FluidStream]) -> Solution[C
2626
# Don't use choke if outlet pressure is below target
2727
return Solution(success=True, configuration=choke_configuration)
2828

29+
# Evaluate outlet pressure at maximum allowed upstream ΔP (within boundary).
30+
max_cfg = ChokeConfiguration(delta_pressure=self._delta_pressure_boundary.max)
31+
outlet_at_max_choke = func(max_cfg)
32+
33+
if outlet_at_max_choke.pressure_bara > self._target_pressure:
34+
# If we are still above target even at max choking, then no solution exists within the boundary.
35+
return Solution(success=False, configuration=max_cfg)
36+
2937
pressure_change = self._root_finding_strategy.find_root(
3038
boundary=self._delta_pressure_boundary,
3139
func=lambda x: func(ChokeConfiguration(delta_pressure=x)).pressure_bara - self._target_pressure,
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import pytest
2+
3+
from libecalc.domain.process.entities.shaft import VariableSpeedShaft
4+
from libecalc.domain.process.process_solver.boundary import Boundary
5+
from libecalc.domain.process.process_system.compressor_stage_process_unit import CompressorStageProcessUnit
6+
from libecalc.domain.process.process_system.process_unit import ProcessUnitId, create_process_unit_id
7+
from libecalc.domain.process.value_objects.fluid_stream import FluidService, FluidStream
8+
9+
10+
class SpeedCompressorStage(CompressorStageProcessUnit):
11+
"""
12+
Test double that makes speed->pressure mapping deterministic:
13+
14+
outlet_pressure = inlet_pressure + shaft_speed
15+
16+
The capacity-related methods are implemented with wide limits to avoid
17+
interfering with tests that focus on solver orchestration.
18+
"""
19+
20+
def __init__(self, shaft: VariableSpeedShaft, fluid_service: FluidService):
21+
self._id = create_process_unit_id()
22+
self._shaft = shaft
23+
self._fluid_service = fluid_service
24+
25+
def get_id(self) -> ProcessUnitId:
26+
return self._id
27+
28+
def get_speed_boundary(self) -> Boundary:
29+
return Boundary(min=200.0, max=600.0)
30+
31+
def get_maximum_standard_rate(self, inlet_stream: FluidStream) -> float:
32+
# "Infinite" capacity for test purposes
33+
return 1e30
34+
35+
def get_minimum_standard_rate(self, inlet_stream: FluidStream) -> float:
36+
# "No minimum" for test purposes
37+
return 0.0
38+
39+
def propagate_stream(self, inlet_stream: FluidStream) -> FluidStream:
40+
speed = self._shaft.get_speed()
41+
return self._fluid_service.create_stream_from_standard_rate(
42+
fluid_model=inlet_stream.fluid_model,
43+
pressure_bara=inlet_stream.pressure_bara + speed,
44+
standard_rate_m3_per_day=inlet_stream.standard_rate_sm3_per_day,
45+
temperature_kelvin=inlet_stream.temperature_kelvin,
46+
)
47+
48+
49+
@pytest.fixture
50+
def speed_compressor_stage_factory(fluid_service):
51+
def create(shaft: VariableSpeedShaft):
52+
return SpeedCompressorStage(shaft=shaft, fluid_service=fluid_service)
53+
54+
return create
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
from libecalc.domain.process.process_solver.float_constraint import FloatConstraint
2+
from libecalc.domain.process.process_solver.pressure_control.upstream_choke import (
3+
UpstreamChokePressureControlStrategy,
4+
UpstreamChokeRunner,
5+
)
6+
7+
8+
def test_upstream_choke_strategy_baseline_below_target_does_not_choke(
9+
simple_process_unit_factory,
10+
process_system_factory,
11+
stream_factory,
12+
choke_factory,
13+
root_finding_strategy,
14+
):
15+
"""
16+
Does not apply upstream choking when baseline outlet pressure is already below the target.
17+
"""
18+
upstream_choke = choke_factory()
19+
process_system = process_system_factory(
20+
process_units=[upstream_choke, simple_process_unit_factory(pressure_multiplier=1)],
21+
)
22+
23+
runner = UpstreamChokeRunner(process_system=process_system, upstream_choke=upstream_choke)
24+
strategy = UpstreamChokePressureControlStrategy(
25+
runner=runner,
26+
root_finding_strategy=root_finding_strategy,
27+
)
28+
29+
inlet_stream = stream_factory(standard_rate_m3_per_day=1000, pressure_bara=50)
30+
target_pressure = FloatConstraint(70.0, abs_tol=1e-12)
31+
32+
success = strategy.apply(target_pressure=target_pressure, inlet_stream=inlet_stream)
33+
assert success is False
34+
35+
# Ensure choke is not applied (baseline-run uses delta_pressure=0.0)
36+
outlet_stream_no_choke = runner.run(inlet_stream=inlet_stream, upstream_delta_pressure=0.0)
37+
assert outlet_stream_no_choke.pressure_bara < target_pressure
38+
39+
40+
def test_upstream_choke_strategy_baseline_above_target_chokes_to_target(
41+
simple_process_unit_factory,
42+
process_system_factory,
43+
stream_factory,
44+
choke_factory,
45+
root_finding_strategy,
46+
):
47+
"""Applies upstream choking when baseline outlet pressure is above the target, so outlet meets target."""
48+
upstream_choke = choke_factory()
49+
process_system = process_system_factory(
50+
process_units=[upstream_choke, simple_process_unit_factory(pressure_multiplier=1)],
51+
)
52+
53+
runner = UpstreamChokeRunner(process_system=process_system, upstream_choke=upstream_choke)
54+
strategy = UpstreamChokePressureControlStrategy(
55+
runner=runner,
56+
root_finding_strategy=root_finding_strategy,
57+
)
58+
59+
inlet_stream = stream_factory(standard_rate_m3_per_day=1000, pressure_bara=100)
60+
target_pressure = FloatConstraint(70.0, abs_tol=1e-12)
61+
62+
success = strategy.apply(target_pressure=target_pressure, inlet_stream=inlet_stream)
63+
assert success is True
64+
65+
# Confirm outlet is at target with the choke state set by the strategy.
66+
outlet_stream_after = process_system.propagate_stream(inlet_stream=inlet_stream)
67+
assert outlet_stream_after.pressure_bara == target_pressure
Lines changed: 5 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,14 @@
11
from libecalc.domain.process.entities.shaft import VariableSpeedShaft
22
from libecalc.domain.process.process_solver.asv_solvers import ASVSolver
3-
from libecalc.domain.process.process_solver.boundary import Boundary
43
from libecalc.domain.process.process_solver.float_constraint import FloatConstraint
5-
from libecalc.domain.process.process_system.compressor_stage_process_unit import CompressorStageProcessUnit
6-
from libecalc.domain.process.process_system.process_unit import ProcessUnitId, create_process_unit_id
7-
from libecalc.domain.process.value_objects.fluid_stream import FluidService, FluidStream
8-
9-
10-
class SpeedCompressorStage(CompressorStageProcessUnit):
11-
"""
12-
Test double that makes speed->pressure mapping deterministic:
13-
14-
outlet_pressure = inlet_pressure + shaft_speed
15-
16-
The capacity-related methods are implemented with wide limits to avoid
17-
interfering with tests that focus on solver orchestration.
18-
"""
19-
20-
def __init__(self, shaft: VariableSpeedShaft, fluid_service: FluidService):
21-
self._id = create_process_unit_id()
22-
self._shaft = shaft
23-
self._fluid_service = fluid_service
24-
25-
def get_id(self) -> ProcessUnitId:
26-
return self._id
27-
28-
def get_speed_boundary(self) -> Boundary:
29-
return Boundary(min=200.0, max=600.0)
30-
31-
def get_maximum_standard_rate(self, inlet_stream: FluidStream) -> float:
32-
# "Infinite" capacity for test purposes
33-
return 1e30
34-
35-
def get_minimum_standard_rate(self, inlet_stream: FluidStream) -> float:
36-
# "No minimum" for test purposes
37-
return 0.0
38-
39-
def propagate_stream(self, inlet_stream: FluidStream) -> FluidStream:
40-
speed = self._shaft.get_speed()
41-
return self._fluid_service.create_stream_from_standard_rate(
42-
fluid_model=inlet_stream.fluid_model,
43-
pressure_bara=inlet_stream.pressure_bara + speed,
44-
standard_rate_m3_per_day=inlet_stream.standard_rate_sm3_per_day,
45-
temperature_kelvin=inlet_stream.temperature_kelvin,
46-
)
474

485

496
def test_asv_solver_applies_downstream_choke_when_speed_solution_is_at_min_speed(
507
stream_factory,
518
process_system_factory,
529
fluid_service,
5310
choke_factory,
11+
speed_compressor_stage_factory,
5412
):
5513
"""
5614
If the target outlet pressure is lower than the outlet pressure at minimum speed, SpeedSolver returns
@@ -61,7 +19,7 @@ def test_asv_solver_applies_downstream_choke_when_speed_solution_is_at_min_speed
6119
downstream_choke = choke_factory()
6220

6321
# One "compressor" stage that increases pressure with speed, followed by a downstream choke.
64-
compressor = SpeedCompressorStage(shaft=shaft, fluid_service=fluid_service)
22+
compressor = speed_compressor_stage_factory(shaft=shaft)
6523

6624
solver = ASVSolver(
6725
shaft=shaft,
@@ -75,10 +33,10 @@ def test_asv_solver_applies_downstream_choke_when_speed_solution_is_at_min_speed
7533

7634
# At min speed=200 => baseline outlet = 25 + 200 = 225.
7735
# Choose a target lower than 225 so SpeedSolver returns min speed with success=False.
78-
target = FloatConstraint(50.0, abs_tol=1e-12)
36+
target_pressure = FloatConstraint(50.0, abs_tol=1e-12)
7937

8038
speed_solution, recirculation_solutions = solver.find_asv_solution(
81-
pressure_constraint=target,
39+
pressure_constraint=target_pressure,
8240
inlet_stream=inlet_stream,
8341
)
8442

@@ -96,4 +54,4 @@ def test_asv_solver_applies_downstream_choke_when_speed_solution_is_at_min_speed
9654
process_system = process_system_factory(process_units=[*solver.get_recirculation_loops(), downstream_choke])
9755
outlet = process_system.propagate_stream(inlet_stream=inlet_stream)
9856

99-
assert outlet.pressure_bara == target
57+
assert outlet.pressure_bara == target_pressure

0 commit comments

Comments
 (0)