diff --git a/tests/saitests/probe/iteration_outcome.py b/tests/saitests/probe/iteration_outcome.py new file mode 100644 index 00000000000..8a103def524 --- /dev/null +++ b/tests/saitests/probe/iteration_outcome.py @@ -0,0 +1,70 @@ +""" +Iteration Outcome Enum - Unified Result Type for Probing Iterations + +This module defines the IterationOutcome enum that represents the complete +outcome of a single probing iteration, replacing the previous (detected, success) +two-parameter approach with a single, semantically clear enum value. + +Design Rationale: +- Simplifies API: one parameter instead of two boolean-like parameters +- Type-safe: Enum prevents invalid combinations +- Self-documenting: Each value clearly describes what happened +- Matches output: Values correspond directly to markdown table 'Check' column + +Note on SKIPPED: +- SKIPPED is NOT a result from executor.check() - it indicates the algorithm + decided not to call check() because the precision target was already met. +- This is a valid iteration outcome that should be reported to the observer. +- It's included here because IterationOutcome represents "what happened in this + iteration", not just "what did check() return". +""" + +from enum import Enum + + +class IterationOutcome(Enum): + """ + Outcome of a single probing iteration + + This enum represents the complete outcome of an iteration, including: + 1. Cases where executor.check() was called and returned a result + 2. Cases where check was intentionally skipped by the algorithm + + Values directly correspond to the 'Check' column in markdown table output. + + Mapping from old API: + REACHED ← detected=True, success=True + UNREACHED ← detected=False, success=True + FAILED ← detected=any, success=False + SKIPPED ← detected=None, success=True (new: check not executed) + """ + + # executor.check() called -> threshold was triggered + REACHED = "reached" + + # executor.check() called -> threshold was NOT triggered + UNREACHED = "unreached" + + # executor.check() called -> verification failed (inconsistent results) + FAILED = "failed" + + # executor.check() NOT called -> precision already satisfied, no probe needed + SKIPPED = "skipped" + + @classmethod + def from_check_result(cls, detected: bool, success: bool) -> "IterationOutcome": + """ + Convert legacy (detected, success) parameters to IterationOutcome + + This helper method supports gradual migration from the old API. + + Args: + detected: True if threshold was triggered, False if not + success: True if verification completed without errors + + Returns: + Corresponding IterationOutcome value + """ + if not success: + return cls.FAILED + return cls.REACHED if detected else cls.UNREACHED diff --git a/tests/saitests/probe/lower_bound_probing_algorithm.py b/tests/saitests/probe/lower_bound_probing_algorithm.py new file mode 100644 index 00000000000..ee7975ad423 --- /dev/null +++ b/tests/saitests/probe/lower_bound_probing_algorithm.py @@ -0,0 +1,143 @@ +""" +Lower Bound Probing Algorithm - Unified Implementation + +Generic lower bound detection algorithm that works with any probing type +(PFC Xoff, Ingress Drop, etc.) through the ProbingExecutorProtocol interface. + +Phase 2 Strategy: +- Start from upper_bound/2 as initial value +- Logarithmically reduce (/2) until threshold dismissed +- Single verification attempt for speed optimization +- Leverages upper bound result from Phase 1 + +Key principles: +1. Pure algorithm logic - no hardware/platform dependencies +2. Executor-agnostic through protocol interface +3. Logarithmic reduction for rapid convergence +4. Algorithm/Executor/Observer separation for clean testing +""" + +import sys +from typing import Optional, Tuple + +# Import model setup for both production and testing environments +if __package__ in (None, ""): + import os + _this_dir = os.path.dirname(os.path.abspath(__file__)) + _saitests_dir = os.path.dirname(_this_dir) + if _saitests_dir not in sys.path: + sys.path.insert(0, _saitests_dir) + __package__ = "probe" + +from probing_executor_protocol import ProbingExecutorProtocol +from probing_observer import ProbingObserver +from iteration_outcome import IterationOutcome + + +class LowerBoundProbingAlgorithm: + """ + Unified Lower Bound Detection Algorithm + + Implements Phase 2: Lower Bound Detection using logarithmic reduction (/2) + until threshold is dismissed, providing the lower boundary for subsequent phases. + + This algorithm works with ANY executor implementing ProbingExecutorProtocol: + - PfcxoffProbingExecutor + - IngressDropProbingExecutor + - MockExecutors + - Future executor types + + Strategy: + - Start from upper_bound/2 + - Logarithmically reduce (/2) until threshold dismissed + - Single verification for speed + - Safety limit to prevent infinite loops + """ + + def __init__(self, executor: ProbingExecutorProtocol, observer: ProbingObserver, + verification_attempts: int = 1): + """ + Initialize lower bound probing algorithm + + Args: + executor: Any executor implementing ProbingExecutorProtocol + observer: Result tracking and reporting (unified ProbingObserver) + verification_attempts: How many times to repeat the same check and require consistency + """ + self.executor = executor + self.observer = observer + self.verification_attempts = verification_attempts + + def run(self, src_port: int, dst_port: int, upper_bound: int, + start_value: int = None, **traffic_keys) -> Tuple[Optional[int], float]: + """ + Run lower bound detection algorithm + + Args: + src_port: Source port for traffic generation + dst_port: Destination port for threshold detection + upper_bound: Upper bound discovered from Phase 1 + start_value: Optional starting value for lower bound search (optimization). + If provided, skip the normal upper_bound/2 calculation and start from this value. + **traffic_keys: Traffic identification keys (e.g., pg=3, queue=5) + Useful when we know a value that definitely won't trigger the threshold. + For example, for Ingress Drop, use (pfc_xoff_threshold - 1) since Drop >= XOFF. + + Returns: + Tuple[Optional[int], float]: (lower_bound, phase_time) or (None, 0.0) on failure + """ + try: + # Prepare ports for threshold probing + self.executor.prepare(src_port, dst_port) + + # Phase 2: Lower Bound Detection using logarithmic reduction (/2) + # OPTIMIZATION: Use start_value if provided, otherwise default to upper_bound/2 + if start_value is not None: + current = start_value + else: + current = upper_bound // 2 + iteration = 0 + max_iterations = 20 # Safety limit + phase_time = 0.0 # Track cumulative phase time + + while iteration < max_iterations and current >= 1: + iteration += 1 + + # Add search window information for Phase 2 (no lower bound yet, only upper) + self.observer.on_iteration_start( + iteration, current, None, upper_bound, + "init" if iteration == 1 else "/2" + ) + + # Phase 2: use a single verification attempt for speed + success, detected = self.executor.check( + src_port, dst_port, current, attempts=self.verification_attempts, + iteration=iteration, **traffic_keys + ) + + iteration_time, phase_time = self.observer.on_iteration_complete( + iteration, current, IterationOutcome.from_check_result(detected, success) + ) + + if not success: + self.observer.on_error(f"Lower bound verification failed at iteration {iteration}") + return (None, phase_time) + + if not detected: + # Threshold dismissed - lower bound found + return (current, phase_time) + else: + # Continue logarithmic reduction + if current <= 1: + # Cannot reduce below 1 — lower bound is 1 + # (threshold triggered even at minimum packet count) + return (1, phase_time) + current = current // 2 + + self.observer.on_error( + "Lower bound detection exceeded maximum iterations") + return (None, phase_time) + + except Exception as e: + self.observer.on_error(f"Lower bound detection algorithm execution failed: {e}") + return (None, 0.0) diff --git a/tests/saitests/probe/probing_executor_protocol.py b/tests/saitests/probe/probing_executor_protocol.py new file mode 100644 index 00000000000..d569991a0d4 --- /dev/null +++ b/tests/saitests/probe/probing_executor_protocol.py @@ -0,0 +1,76 @@ +""" +Probing Executor Protocol - Unified Interface + +Defines the standard interface that all threshold probing executors must implement. +This protocol enables algorithm code to be completely executor-agnostic. + +Design Philosophy: +- Define common interface without implementation +- Enable algorithm reuse across different probing types (PFC Xoff, Ingress Drop, etc.) +- Support both physical device and mock executors +- Type-safe with Python's Protocol (structural subtyping) + +Usage: + def my_algorithm(executor: ProbingExecutorProtocol): + executor.prepare(src, dst) + success, detected = executor.check(src, dst, value) + # Works with ANY executor implementing this protocol +""" + +from typing import Protocol, Tuple, runtime_checkable + + +@runtime_checkable +class ProbingExecutorProtocol(Protocol): + """ + Protocol defining the standard executor interface for threshold probing + + All probing executors (PfcxoffProbingExecutor, IngressDropProbingExecutor, etc.) + must implement these methods to be compatible with unified algorithms. + + This is a structural protocol - classes don't need to explicitly inherit from it. + If a class has these methods with matching signatures, it automatically satisfies + the protocol. + """ + + def prepare(self, src_port: int, dst_port: int) -> None: + """ + Prepare ports for threshold detection + + Ensures clean buffer state before probing begins. + Typically involves draining buffers and setting up congestion conditions. + + Args: + src_port: Source port for traffic generation + dst_port: Destination port for threshold detection + """ + ... + + def check(self, src_port: int, dst_port: int, value: int, + attempts: int = 1, drain_buffer: bool = True, + iteration: int = 0, **traffic_keys) -> Tuple[bool, bool]: + """ + Check if threshold is reached at given value + + Standard 5-step verification process: + 1. Port preparation (optional via drain_buffer) + 2. Baseline measurement + 3. Traffic injection + 4. Wait for counter refresh + 5. Threshold detection + + Args: + src_port: Source port for traffic generation + dst_port: Destination port for threshold detection + value: Packet count to test + attempts: Number of verification attempts for consistency + drain_buffer: Whether to drain buffer before testing + iteration: Current iteration number (for metrics tracking) + **traffic_keys: Traffic identification keys (e.g., pg=3, queue=5) + + Returns: + Tuple[success, detected]: + - success: True if verification completed without errors + - detected: True if threshold was triggered at this value + """ + ... diff --git a/tests/saitests/probe/probing_result.py b/tests/saitests/probe/probing_result.py new file mode 100644 index 00000000000..68d2f66b197 --- /dev/null +++ b/tests/saitests/probe/probing_result.py @@ -0,0 +1,95 @@ +""" +Probing Result Data Class + +Defines standardized return type for all probing test cases. +Using dataclass to ensure consistent structure and type safety. + +Design principles: +1. Unified format: All probing tests return ThresholdResult +2. Type safety: Dataclass enforces field types +3. Range/Point unification: Point is special case of Range (lower == upper) +4. Nullable: Use Optional for failure cases + +Usage: +- PfcXoffProbing: Returns ThresholdResult with PFC XOFF threshold +- IngressDropProbing: Returns ThresholdResult with Ingress Drop threshold +- HeadroomPoolProbing: Returns ThresholdResult with total pool size +""" + +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class ThresholdResult: + """ + Unified threshold detection result for all probing types. + + Used by: + - PfcXoffProbing: threshold = PFC XOFF threshold + - IngressDropProbing: threshold = Ingress Drop threshold + - HeadroomPoolProbing: threshold = total headroom pool size + + Attributes: + lower_bound: Lower bound of threshold range (or exact point if lower == upper) + upper_bound: Upper bound of threshold range (or exact point if lower == upper) + success: Whether detection succeeded + phase_time: Time spent in this phase (seconds), set by algorithm from observer + + Notes: + - For range: lower_bound < upper_bound + - For point: lower_bound == upper_bound (precise detection) + - For failure: lower_bound = upper_bound = None, success = False + """ + lower_bound: Optional[int] + upper_bound: Optional[int] + success: bool + phase_time: float = 0.0 # Time in seconds for this phase + + @classmethod + def from_bounds(cls, lower: Optional[int], upper: Optional[int]) -> 'ThresholdResult': + """Create ThresholdResult from lower/upper bounds.""" + success = lower is not None and upper is not None + return cls(lower_bound=lower, upper_bound=upper, success=success) + + @classmethod + def failed(cls) -> 'ThresholdResult': + """Create a failed result.""" + return cls(lower_bound=None, upper_bound=None, success=False) + + @property + def is_point(self) -> bool: + """Check if result is a precise point (lower == upper).""" + return (self.success and self.lower_bound is not None + and self.upper_bound is not None + and self.lower_bound == self.upper_bound) + + @property + def is_range(self) -> bool: + """Check if result is a range (lower < upper).""" + return (self.success and self.lower_bound is not None + and self.upper_bound is not None + and self.lower_bound < self.upper_bound) + + @property + def value(self) -> Optional[int]: + """Get threshold value (for point) or lower bound (for range).""" + return self.lower_bound + + @property + def candidate(self) -> Optional[int]: + """Get candidate threshold (midpoint for range, exact value for point).""" + if not self.success: + return None + if self.is_point: + return self.lower_bound + # For range: return midpoint + return (self.lower_bound + self.upper_bound) // 2 + + def __repr__(self) -> str: + if not self.success: + return "ThresholdResult(FAILED)" + elif self.is_point: + return f"ThresholdResult(point={self.lower_bound})" + else: + return f"ThresholdResult(range=[{self.lower_bound}, {self.upper_bound}])" diff --git a/tests/saitests/probe/threshold_point_probing_algorithm.py b/tests/saitests/probe/threshold_point_probing_algorithm.py new file mode 100644 index 00000000000..524fbba1c53 --- /dev/null +++ b/tests/saitests/probe/threshold_point_probing_algorithm.py @@ -0,0 +1,150 @@ +""" +Threshold Point Probing Algorithm - Unified Implementation + +Generic precise threshold point detection algorithm that works with any probing type +(PFC Xoff, Ingress Drop, etc.) through the ProbingExecutorProtocol interface. + +This module implements precise threshold detection through step-by-step +packet increment within a known threshold range. Unlike the three-phase discovery +algorithm which finds a range, this algorithm finds the exact threshold point. + +Design principles: +1. Pure algorithm logic - no hardware/platform dependencies +2. Executor-agnostic through protocol interface +3. Step-by-step increment from lower_bound to upper_bound +4. Stops at first threshold trigger point +5. Performance optimization - incremental packet sending strategy +""" + +import sys +from typing import Optional, Tuple + +# Import model setup for both production and testing environments +if __package__ in (None, ""): + import os + _this_dir = os.path.dirname(os.path.abspath(__file__)) + _saitests_dir = os.path.dirname(_this_dir) + if _saitests_dir not in sys.path: + sys.path.insert(0, _saitests_dir) + __package__ = "probe" + +from probing_executor_protocol import ProbingExecutorProtocol +from probing_observer import ProbingObserver +from iteration_outcome import IterationOutcome + + +class ThresholdPointProbingAlgorithm: + """ + Unified Precise Threshold Point Detection Algorithm + + Implements precise threshold point detection through: + 1. Step-by-step increment from lower_bound to upper_bound + 2. 1-packet increment per iteration + 3. Stops at first threshold trigger + 4. Performance optimized packet sending + + This algorithm works with ANY executor implementing ProbingExecutorProtocol: + - PfcxoffProbingExecutor + - IngressDropProbingExecutor + - MockExecutors + - Future executor types + + Key features: + - Precise point detection (not range) + - Single verification per step (configurable) + - Optimized packet sending strategy + - Failure detection if no threshold found in range + + Note on step_size: + When step_size > 1, the returned threshold point has ±step_size + tolerance. For example, with step_size=2, the algorithm checks + 101, 103, 105... — if 103 triggers, the actual threshold could + be 102 (skipped). Default step_size=1 gives exact results. + """ + + def __init__(self, executor: ProbingExecutorProtocol, observer: ProbingObserver, + verification_attempts: int = 1, + step_size: int = 1): + """ + Initialize threshold point probing algorithm + + Args: + executor: Any executor implementing ProbingExecutorProtocol + observer: Unified ProbingObserver for Phase 4 (threshold_point) + verification_attempts: How many times to repeat each check and require consistency + step_size: Step increment size (default 1 for exact results; values > 1 + trade precision for speed — returned point has ±step_size tolerance) + """ + self.executor = executor + self.observer = observer + self.verification_attempts = verification_attempts + self.step_size = step_size + + def run(self, src_port: int, dst_port: int, lower_bound: int, + upper_bound: int, **traffic_keys) -> Tuple[Optional[int], Optional[int], float]: + """ + Probe for precise threshold point within known range + + Args: + src_port: Source port for sending traffic + dst_port: Destination port for threshold detection + lower_bound: Starting point for step-by-step search + upper_bound: End point for search range + **traffic_keys: Traffic identification keys (e.g., pg=3, queue=5) + + Returns: + Tuple[lower_bound, upper_bound, phase_time] where: + (point, point, time): precise threshold found with phase execution time + (None, None, 0.0): not found or error + """ + try: + # Step 1: Prepare ports for precise detection + self.executor.prepare(src_port, dst_port) + + # Step 2: Step-by-step search from lower_bound to upper_bound + # Use incremental packet sending for performance optimization + # Start from lower_bound+1 since lower_bound is confirmed unreached (skip known value) + step = self.step_size + phase_time = 0.0 # Track cumulative phase time + drain_buffer = True # First iteration always drains; reset to True on failure + + for iteration, current_packets in enumerate(range(lower_bound + 1, upper_bound + 1, step), start=1): + # Full send after drain, incremental otherwise + send_value = current_packets if drain_buffer else step + attempts = self.verification_attempts if drain_buffer else 1 + + self.observer.on_iteration_start( + iteration, current_packets, current_packets, upper_bound, + "init" if iteration == 1 else f"+{step}" + ) + + success, detected = self.executor.check( + src_port, dst_port, + value=send_value, + attempts=attempts, + drain_buffer=drain_buffer, + iteration=iteration, + **traffic_keys + ) + + iteration_time, phase_time = self.observer.on_iteration_complete( + iteration, current_packets, IterationOutcome.from_check_result(detected, success) + ) + + if not success: + # Buffer state unknown after failure — drain on next iteration + drain_buffer = True + continue + + drain_buffer = False # Successful — switch to incremental mode + + if detected: + precise_threshold = current_packets + return (precise_threshold, precise_threshold, phase_time) + + # Step 3: No threshold found in range - algorithm failure + return (None, None, phase_time) + + except Exception as e: + self.observer.on_error(f"Threshold point detection error: {e}") + return (None, None, 0.0) diff --git a/tests/saitests/probe/threshold_range_probing_algorithm.py b/tests/saitests/probe/threshold_range_probing_algorithm.py new file mode 100644 index 00000000000..e9a43052c48 --- /dev/null +++ b/tests/saitests/probe/threshold_range_probing_algorithm.py @@ -0,0 +1,234 @@ +""" +Threshold Range Probing Algorithm - Unified Implementation + +Generic precision range detection algorithm that works with any probing type +(PFC Xoff, Ingress Drop, etc.) through the ProbingExecutorProtocol interface. + +Phase 3 Strategy: +- Start with range from Phase 2 (lower_bound to upper_bound) +- Binary search with adaptive termination (5% precision OR fixed range limit) +- Stack-based backtracking with anti-oscillation nudge for noise resilience +- Multiple verification attempts for noise-resilient detection + +Anti-Oscillation Backtrack Design: + When a child range fails verification, we pop back to its parent range. + Without adjustment, the parent would produce the same midpoint and same + child — causing infinite oscillation. To avoid this, we nudge the parent + range boundary in the direction that makes the failing move less aggressive. + + The nudge direction depends on whether the parent's last successful check + was 'reached' (threshold triggered → searched left) or 'unreached' + (threshold not triggered → searched right): + + Scenario 1: Parent unreached → searched right → child FAIL + Parent's right-move was too aggressive. Nudge parent_start left + (decrease) so the new midpoint is lower, producing a less aggressive + right-move next time. + Adjustment: parent_start -= nudge + + Scenario 2: Parent reached → searched left → child FAIL + Parent's left-move was too aggressive. Nudge parent_end right + (increase) so the new midpoint is higher, producing a less aggressive + left-move next time. + Adjustment: parent_end += nudge + + Multi-layer backtrack (new parent also fails → pop to grandparent): + Before applying the nudge to grandparent, merge boundaries to preserve + the wider search space explored by the failed descendant: + grandparent = (min(gp_start, failed_start), max(gp_end, failed_end)) + Then apply the same nudge logic based on grandparent's direction. + + Nudge size: proportional to parent range — max(1, range_size // 10). + + 6 scenario walkthrough: see unit tests in test_threshold_range_probing_algorithm.py + +Key principles: +1. Pure algorithm logic - no hardware/platform dependencies +2. Executor-agnostic through protocol interface +3. Binary search with dynamic precision control +4. Stack-based backtracking with anti-oscillation for verification failures +""" + +import sys +from typing import Optional, Tuple + +# Import model setup for both production and testing environments +if __package__ in (None, ""): + import os + _this_dir = os.path.dirname(os.path.abspath(__file__)) + _saitests_dir = os.path.dirname(_this_dir) + if _saitests_dir not in sys.path: + sys.path.insert(0, _saitests_dir) + __package__ = "probe" + +from probing_executor_protocol import ProbingExecutorProtocol +from probing_observer import ProbingObserver +from iteration_outcome import IterationOutcome + + +class ThresholdRangeProbingAlgorithm: + """ + Unified Threshold Range Detection Algorithm + + Implements Phase 3: Precision Range Detection using binary search with adaptive + precision control, providing the final threshold range for detection. + + This algorithm works with ANY executor implementing ProbingExecutorProtocol: + - PfcxoffProbingExecutor + - IngressDropProbingExecutor + - MockExecutors + - Future executor types + + Strategy: + - Binary search within [lower_bound, upper_bound] + - Adaptive termination: range_size <= candidate_threshold * 5% OR fixed range limit + - Stack-based backtracking with anti-oscillation nudge + - Noise-resilient verification (configurable attempts) + """ + + def __init__(self, executor: ProbingExecutorProtocol, + observer: ProbingObserver, + precision_target_ratio: float = 0.05, + verification_attempts: int = 5, + enable_precise_detection: bool = False, + precise_detection_range_limit: int = 100): + """ + Initialize threshold range probing algorithm + + Args: + executor: Any executor implementing ProbingExecutorProtocol + observer: Result tracking and reporting (unified ProbingObserver) + precision_target_ratio: Dynamic precision target (default 5%) + verification_attempts: How many times to repeat each check and require consistency + enable_precise_detection: Enable precise step-by-step detection mode + precise_detection_range_limit: Range limit for precise detection (default 100) + """ + self.executor = executor + self.observer = observer + self.precision_target_ratio = precision_target_ratio + self.verification_attempts = verification_attempts + self.enable_precise_detection = enable_precise_detection + self.precise_detection_range_limit = precise_detection_range_limit + + @staticmethod + def _backtrack_nudge(range_start, range_end): + """Calculate nudge size for anti-oscillation backtrack. + + Proportional to current range: large ranges get larger nudges to + shift the midpoint meaningfully, small ranges get minimal nudges. + + Returns: + int: nudge size (at least 1, proportional to range_size // 10) + """ + return max(1, (range_end - range_start) // 10) + + def run(self, src_port: int, dst_port: int, + lower_bound: int, upper_bound: int, **traffic_keys) -> Tuple[Optional[int], Optional[int], float]: + """ + Run threshold range detection algorithm + + Args: + src_port: Source port for traffic generation + dst_port: Destination port for threshold detection + lower_bound: Lower bound discovered from Phase 2 + upper_bound: Upper bound discovered from Phase 1 + **traffic_keys: Traffic identification keys (e.g., pg=3, queue=5) + + Returns: + Tuple[lower_bound, upper_bound, phase_time]: Detected threshold range + with phase time or (None, None, 0.0) on failure + """ + try: + # Prepare ports for threshold probing + self.executor.prepare(src_port, dst_port) + + # Phase 3: Precision Range Detection using binary search with + # dynamic precision control and anti-oscillation backtracking + # + # Stack entries: (range_start, range_end, direction) + # direction: how this range was produced by its parent + # 'init' - initial range (no parent) + # 'right' - parent was unreached → searched right + # 'left' - parent was reached → searched left + # 'nudge' - backtracked with anti-oscillation nudge + STEP_LABELS = {'init': 'init', 'right': 'L->', 'left': '<-U', 'nudge': 'L<->U'} + + range_stack = [(lower_bound, upper_bound, 'init')] + iteration = 0 + max_iterations = 50 + phase_time = 0.0 + + while iteration < max_iterations and range_stack: + iteration += 1 + range_start, range_end, direction = range_stack[-1] + candidate_threshold = (range_start + range_end) // 2 + + self.observer.on_iteration_start( + iteration, candidate_threshold, range_start, range_end, + STEP_LABELS.get(direction, direction)) + + # Check dynamic precision target + range_size = range_end - range_start + if self.enable_precise_detection: + precision_reached = range_size <= self.precise_detection_range_limit + else: + precision_reached = range_size <= max(1, int(candidate_threshold * self.precision_target_ratio)) + + if precision_reached: + iteration_time, phase_time = self.observer.on_iteration_complete( + iteration, candidate_threshold, IterationOutcome.SKIPPED) + return (range_start, range_end, phase_time) + + # Noise-resilient verification + success, detected = self.executor.check( + src_port, dst_port, candidate_threshold, + attempts=self.verification_attempts, iteration=iteration, **traffic_keys + ) + + iteration_time, phase_time = self.observer.on_iteration_complete( + iteration, candidate_threshold, + IterationOutcome.from_check_result(detected, success) + ) + + if not success: + # Backtrack with anti-oscillation nudge: + # Pop failed child, nudge parent boundary to shift its + # midpoint, preventing the same child from being produced. + failed_start, failed_end, _ = range_stack.pop() + + if range_stack: + parent_start, parent_end, parent_dir = range_stack[-1] + nudge = self._backtrack_nudge(parent_start, parent_end) + + # Merge: preserve the wider search space from failed child + merged_start = min(parent_start, failed_start) + merged_end = max(parent_end, failed_end) + + # Nudge in the direction opposite to parent's last move + if parent_dir in ('right', 'init'): + merged_start = max(0, merged_start - nudge) # Soften right-move + else: + merged_end += nudge # Soften left-move + + range_stack[-1] = (merged_start, merged_end, 'nudge') + else: + if detected: + range_stack.append((range_start, candidate_threshold, 'left')) + else: + range_stack.append((candidate_threshold + 1, range_end, 'right')) + + # Unified error handling after while loop exit + if iteration >= max_iterations: + self.observer.on_error(f"Threshold range detection exceeded maximum iterations ({max_iterations})") + elif not range_stack: + # Stack exhausted due to backtracking failures + self.observer.on_error("Threshold range backtrack exhausted all ranges") + else: + # Defensive - should not happen under normal conditions + self.observer.on_error("Threshold range detection terminated unexpectedly") + + return (None, None, phase_time) + + except Exception as e: + self.observer.on_error(f"Threshold range detection algorithm execution failed: {e}") + return (None, None, 0.0) diff --git a/tests/saitests/probe/upper_bound_probing_algorithm.py b/tests/saitests/probe/upper_bound_probing_algorithm.py new file mode 100644 index 00000000000..2e9636cb06a --- /dev/null +++ b/tests/saitests/probe/upper_bound_probing_algorithm.py @@ -0,0 +1,129 @@ +""" +Upper Bound Probing Algorithm - Unified Implementation + +Generic upper bound discovery algorithm that works with any probing type +(PFC Xoff, Ingress Drop, etc.) through the ProbingExecutorProtocol interface. + +Phase 1 Strategy: +- Start from buffer_pool_size as initial value +- Exponentially increase (x2) until threshold triggered +- Typically reaches threshold in one iteration since initial value uses buffer pool size +- Single verification attempt for speed optimization + +Key principles: +1. Pure algorithm logic - no hardware/platform dependencies +2. Executor-agnostic through protocol interface +3. Exponential growth for rapid convergence +4. Algorithm/Executor/Observer separation for clean testing +""" + +import sys +from typing import Optional, Tuple + +# Import model setup for both production and testing environments +if __package__ in (None, ""): + import os + _this_dir = os.path.dirname(os.path.abspath(__file__)) + _saitests_dir = os.path.dirname(_this_dir) + if _saitests_dir not in sys.path: + sys.path.insert(0, _saitests_dir) + __package__ = "probe" + +from probing_executor_protocol import ProbingExecutorProtocol +from probing_observer import ProbingObserver +from iteration_outcome import IterationOutcome + + +class UpperBoundProbingAlgorithm: + """ + Unified Upper Bound Discovery Algorithm + + Implements Phase 1: Upper Bound Discovery using exponential growth (x2) + until threshold is triggered, providing the upper boundary for subsequent phases. + + This algorithm works with ANY executor implementing ProbingExecutorProtocol: + - PfcxoffProbingExecutor + - IngressDropProbingExecutor + - MockExecutors + - Future executor types + + Strategy: + - Start from buffer_pool_size + - Exponentially increase (x2) until threshold triggered + - Single verification for speed + - Safety limit to prevent infinite loops + """ + + def __init__(self, executor: ProbingExecutorProtocol, observer: ProbingObserver, + verification_attempts: int = 1): + """ + Initialize upper bound probing algorithm + + Args: + executor: Any executor implementing ProbingExecutorProtocol + observer: Result tracking and reporting (unified ProbingObserver) + verification_attempts: How many times to repeat the same check and require consistency + """ + self.executor = executor + self.observer = observer + self.verification_attempts = verification_attempts + + def run(self, src_port: int, dst_port: int, initial_value: int, **traffic_keys) -> Tuple[Optional[int], float]: + """ + Run upper bound discovery algorithm + + Args: + src_port: Source port for traffic generation + dst_port: Destination port for threshold detection + initial_value: Starting value (typically buffer_pool_size) + **traffic_keys: Traffic identification keys (e.g., pg=3, queue=5) + + Returns: + Tuple[Optional[int], float]: (upper_bound, phase_time) or (None, 0.0) on failure + """ + try: + # Prepare ports for threshold probing + self.executor.prepare(src_port, dst_port) + + # Phase 1: Upper Bound Discovery using exponential growth (x2) + current = initial_value + iteration = 0 + max_iterations = 20 # Safety limit + phase_time = 0.0 # Track cumulative phase time + + while iteration < max_iterations: + iteration += 1 + + # Add search window information for Phase 1 (no upper bound yet) + self.observer.on_iteration_start( + iteration, current, None, None, + "init" if iteration == 1 else "x2" + ) + + # Phase 1: use a single verification attempt for speed + success, detected = self.executor.check( + src_port, dst_port, current, attempts=self.verification_attempts, + iteration=iteration, **traffic_keys + ) + + iteration_time, phase_time = self.observer.on_iteration_complete( + iteration, current, IterationOutcome.from_check_result(detected, success) + ) + + if not success: + self.observer.on_error(f"Upper bound verification failed at iteration {iteration}") + return (None, phase_time) + + if detected: + # Threshold triggered - upper bound found + return (current, phase_time) + else: + # Continue exponential growth + current *= 2 + + self.observer.on_error(f"Upper bound discovery exceeded maximum iterations ({max_iterations})") + return (None, phase_time) + + except Exception as e: + self.observer.on_error(f"Upper bound discovery algorithm execution failed: {e}") + return (None, 0.0)