Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions tests/saitests/probe/iteration_outcome.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""
Iteration Outcome Enum - Unified Result Type for Probing Iterations

This module defines the IterationOutcome enum that represents the complete
outcome of a single probing iteration, replacing the previous (detected, success)
two-parameter approach with a single, semantically clear enum value.

Design Rationale:
- Simplifies API: one parameter instead of two boolean-like parameters
- Type-safe: Enum prevents invalid combinations
- Self-documenting: Each value clearly describes what happened
- Matches output: Values correspond directly to markdown table 'Check' column

Note on SKIPPED:
- SKIPPED is NOT a result from executor.check() - it indicates the algorithm
decided not to call check() because the precision target was already met.
- This is a valid iteration outcome that should be reported to the observer.
- It's included here because IterationOutcome represents "what happened in this
iteration", not just "what did check() return".
"""

from enum import Enum


class IterationOutcome(Enum):
"""
Outcome of a single probing iteration

This enum represents the complete outcome of an iteration, including:
1. Cases where executor.check() was called and returned a result
2. Cases where check was intentionally skipped by the algorithm

Values directly correspond to the 'Check' column in markdown table output.

Mapping from old API:
REACHED ← detected=True, success=True
UNREACHED ← detected=False, success=True
FAILED ← detected=any, success=False
SKIPPED ← detected=None, success=True (new: check not executed)
"""

# executor.check() called -> threshold was triggered
REACHED = "reached"

# executor.check() called -> threshold was NOT triggered
UNREACHED = "unreached"

# executor.check() called -> verification failed (inconsistent results)
FAILED = "failed"

# executor.check() NOT called -> precision already satisfied, no probe needed
SKIPPED = "skipped"

@classmethod
def from_check_result(cls, detected: bool, success: bool) -> "IterationOutcome":
"""
Convert legacy (detected, success) parameters to IterationOutcome

This helper method supports gradual migration from the old API.

Args:
detected: True if threshold was triggered, False if not
success: True if verification completed without errors

Returns:
Corresponding IterationOutcome value
"""
if not success:
return cls.FAILED
return cls.REACHED if detected else cls.UNREACHED
142 changes: 142 additions & 0 deletions tests/saitests/probe/lower_bound_probing_algorithm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
"""
Lower Bound Probing Algorithm - Unified Implementation

Generic lower bound detection algorithm that works with any probing type
(PFC Xoff, Ingress Drop, etc.) through the ProbingExecutorProtocol interface.

Phase 2 Strategy:
- Start from upper_bound/2 as initial value
- Logarithmically reduce (/2) until threshold dismissed
- Single verification attempt for speed optimization
- Leverages upper bound result from Phase 1

Key principles:
1. Pure algorithm logic - no hardware/platform dependencies
2. Executor-agnostic through protocol interface
3. Logarithmic reduction for rapid convergence
4. Algorithm/Executor/Observer separation for clean testing
"""

import sys
from typing import Optional, Tuple

# Import model setup for both production and testing environments
if __package__ in (None, ""):
import os
_this_dir = os.path.dirname(os.path.abspath(__file__))
_saitests_dir = os.path.dirname(_this_dir)
if _saitests_dir not in sys.path:
sys.path.insert(0, _saitests_dir)
__package__ = "probe"

from probing_executor_protocol import ProbingExecutorProtocol
from probing_observer import ProbingObserver
from iteration_outcome import IterationOutcome


class LowerBoundProbingAlgorithm:
"""
Unified Lower Bound Detection Algorithm

Implements Phase 2: Lower Bound Detection using logarithmic reduction (/2)
until threshold is dismissed, providing the lower boundary for subsequent phases.

This algorithm works with ANY executor implementing ProbingExecutorProtocol:
- PfcxoffProbingExecutor
- IngressDropProbingExecutor
- MockExecutors
- Future executor types

Strategy:
- Start from upper_bound/2
- Logarithmically reduce (/2) until threshold dismissed
- Single verification for speed
- Safety limit to prevent infinite loops
"""

def __init__(self, executor: ProbingExecutorProtocol, observer: ProbingObserver,
verification_attempts: int = 1):
"""
Initialize lower bound probing algorithm

Args:
executor: Any executor implementing ProbingExecutorProtocol
observer: Result tracking and reporting (unified ProbingObserver)
verification_attempts: How many times to repeat the same check and require consistency
"""
self.executor = executor
self.observer = observer
self.verification_attempts = verification_attempts

def run(self, src_port: int, dst_port: int, upper_bound: int,
start_value: int = None, **traffic_keys) -> Tuple[Optional[int], float]:
"""
Run lower bound detection algorithm

Args:
src_port: Source port for traffic generation
dst_port: Destination port for threshold detection
upper_bound: Upper bound discovered from Phase 1
start_value: Optional starting value for lower bound search (optimization).
If provided, skip the normal upper_bound/2 calculation and start from this value.
**traffic_keys: Traffic identification keys (e.g., pg=3, queue=5)
Useful when we know a value that definitely won't trigger the threshold.
For example, for Ingress Drop, use (pfc_xoff_threshold - 1) since Drop >= XOFF.

Returns:
Tuple[Optional[int], float]: (lower_bound, phase_time) or (None, 0.0) on failure
"""
try:
# Prepare ports for threshold probing
self.executor.prepare(src_port, dst_port)

# Phase 2: Lower Bound Detection using logarithmic reduction (/2)
# OPTIMIZATION: Use start_value if provided, otherwise default to upper_bound/2
if start_value is not None:
current = start_value
else:
current = upper_bound // 2
iteration = 0
max_iterations = 20 # Safety limit
phase_time = 0.0 # Track cumulative phase time

while iteration < max_iterations and current >= 1:
iteration += 1

# Add search window information for Phase 2 (no lower bound yet, only upper)
self.observer.on_iteration_start(
iteration, current, None, upper_bound,
"init" if iteration == 1 else "/2"
)

# Phase 2: use a single verification attempt for speed
success, detected = self.executor.check(
src_port, dst_port, current, attempts=self.verification_attempts,
iteration=iteration, **traffic_keys
)

iteration_time, phase_time = self.observer.on_iteration_complete(
iteration, current, IterationOutcome.from_check_result(detected, success)
)

if not success:
self.observer.on_error(f"Lower bound verification failed at iteration {iteration}")
return (None, phase_time)

if not detected:
# Threshold dismissed - lower bound found
return (current, phase_time)
else:
# Continue logarithmic reduction
if current <= 1:
# Cannot reduce below 1 — threshold is reached even at minimum
break
current = current // 2

self.observer.on_error(
"Lower bound detection exceeded maximum iterations or reached minimum value")
return (None, phase_time)

except Exception as e:
self.observer.on_error(f"Lower bound detection algorithm execution failed: {e}")
return (None, 0.0)
76 changes: 76 additions & 0 deletions tests/saitests/probe/probing_executor_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""
Probing Executor Protocol - Unified Interface

Defines the standard interface that all threshold probing executors must implement.
This protocol enables algorithm code to be completely executor-agnostic.

Design Philosophy:
- Define common interface without implementation
- Enable algorithm reuse across different probing types (PFC Xoff, Ingress Drop, etc.)
- Support both physical device and mock executors
- Type-safe with Python's Protocol (structural subtyping)

Usage:
def my_algorithm(executor: ProbingExecutorProtocol):
executor.prepare(src, dst)
success, detected = executor.check(src, dst, value)
# Works with ANY executor implementing this protocol
"""

from typing import Protocol, Tuple, runtime_checkable


@runtime_checkable
class ProbingExecutorProtocol(Protocol):
"""
Protocol defining the standard executor interface for threshold probing

All probing executors (PfcxoffProbingExecutor, IngressDropProbingExecutor, etc.)
must implement these methods to be compatible with unified algorithms.

This is a structural protocol - classes don't need to explicitly inherit from it.
If a class has these methods with matching signatures, it automatically satisfies
the protocol.
"""

def prepare(self, src_port: int, dst_port: int) -> None:
"""
Prepare ports for threshold detection

Ensures clean buffer state before probing begins.
Typically involves draining buffers and setting up congestion conditions.

Args:
src_port: Source port for traffic generation
dst_port: Destination port for threshold detection
"""
...

Check notice

Code scanning / CodeQL

Statement has no effect Note test

This statement has no effect.

def check(self, src_port: int, dst_port: int, value: int,
attempts: int = 1, drain_buffer: bool = True,
iteration: int = 0, **traffic_keys) -> Tuple[bool, bool]:
"""
Check if threshold is reached at given value

Standard 5-step verification process:
1. Port preparation (optional via drain_buffer)
2. Baseline measurement
3. Traffic injection
4. Wait for counter refresh
5. Threshold detection

Args:
src_port: Source port for traffic generation
dst_port: Destination port for threshold detection
value: Packet count to test
attempts: Number of verification attempts for consistency
drain_buffer: Whether to drain buffer before testing
iteration: Current iteration number (for metrics tracking)
**traffic_keys: Traffic identification keys (e.g., pg=3, queue=5)

Returns:
Tuple[success, detected]:
- success: True if verification completed without errors
- detected: True if threshold was triggered at this value
"""
...

Check notice

Code scanning / CodeQL

Statement has no effect Note test

This statement has no effect.
95 changes: 95 additions & 0 deletions tests/saitests/probe/probing_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""
Probing Result Data Class
Defines standardized return type for all probing test cases.
Using dataclass to ensure consistent structure and type safety.
Design principles:
1. Unified format: All probing tests return ThresholdResult
2. Type safety: Dataclass enforces field types
3. Range/Point unification: Point is special case of Range (lower == upper)
4. Nullable: Use Optional for failure cases
Usage:
- PfcXoffProbing: Returns ThresholdResult with PFC XOFF threshold
- IngressDropProbing: Returns ThresholdResult with Ingress Drop threshold
- HeadroomPoolProbing: Returns ThresholdResult with total pool size
"""

from dataclasses import dataclass
from typing import Optional


@dataclass
class ThresholdResult:
"""
Unified threshold detection result for all probing types.
Used by:
- PfcXoffProbing: threshold = PFC XOFF threshold
- IngressDropProbing: threshold = Ingress Drop threshold
- HeadroomPoolProbing: threshold = total headroom pool size
Attributes:
lower_bound: Lower bound of threshold range (or exact point if lower == upper)
upper_bound: Upper bound of threshold range (or exact point if lower == upper)
success: Whether detection succeeded
phase_time: Time spent in this phase (seconds), set by algorithm from observer
Notes:
- For range: lower_bound < upper_bound
- For point: lower_bound == upper_bound (precise detection)
- For failure: lower_bound = upper_bound = None, success = False
"""
lower_bound: Optional[int]
upper_bound: Optional[int]
success: bool
phase_time: float = 0.0 # Time in seconds for this phase

@classmethod
def from_bounds(cls, lower: Optional[int], upper: Optional[int]) -> 'ThresholdResult':
"""Create ThresholdResult from lower/upper bounds."""
success = lower is not None and upper is not None
return cls(lower_bound=lower, upper_bound=upper, success=success)

@classmethod
def failed(cls) -> 'ThresholdResult':
"""Create a failed result."""
return cls(lower_bound=None, upper_bound=None, success=False)

@property
def is_point(self) -> bool:
"""Check if result is a precise point (lower == upper)."""
return (self.success and self.lower_bound is not None
and self.upper_bound is not None
and self.lower_bound == self.upper_bound)

@property
def is_range(self) -> bool:
"""Check if result is a range (lower < upper)."""
return (self.success and self.lower_bound is not None
and self.upper_bound is not None
and self.lower_bound < self.upper_bound)

@property
def value(self) -> Optional[int]:
"""Get threshold value (for point) or lower bound (for range)."""
return self.lower_bound

@property
def candidate(self) -> Optional[int]:
"""Get candidate threshold (midpoint for range, exact value for point)."""
if not self.success:
return None
if self.is_point:
return self.lower_bound
# For range: return midpoint
return (self.lower_bound + self.upper_bound) // 2

def __repr__(self) -> str:
if not self.success:
return "ThresholdResult(FAILED)"
elif self.is_point:
return f"ThresholdResult(point={self.lower_bound})"
else:
return f"ThresholdResult(range=[{self.lower_bound}, {self.upper_bound}])"
Loading
Loading