feat: Streaming Kolmogorov-Smirnov test for drift detection#99
feat: Streaming Kolmogorov-Smirnov test for drift detection#99SudipSinha wants to merge 7 commits into
Conversation
Reviewer's GuideImplements a Greenwald–Khanna quantile sketch and a streaming two-sample Kolmogorov–Smirnov drift metric built on top of it, exposes the metric via new FastAPI endpoints (with deprecated aliases), and adds extensive tests and documentation validating correctness, error bounds, and API behavior. Sequence diagram for /metrics/drift/ksteststreaming computation flowsequenceDiagram
actor Client
participant FastAPI_router
participant DriftEndpoints
participant DataSource
participant KS as KolmogorovSmirnovStreaming
Client->>FastAPI_router: POST /metrics/drift/ksteststreaming
FastAPI_router->>DriftEndpoints: compute_ksteststreaming(request)
DriftEndpoints->>DataSource: get_dataframe_by_tag(model_id, reference_tag)
DataSource-->>DriftEndpoints: reference_df
DriftEndpoints->>DataSource: get_organic_dataframe(model_id, batch_size)
DataSource-->>DriftEndpoints: current_df
DriftEndpoints->>KS: __init__(epsilon)
loop for_each_feature_in_fit_columns
DriftEndpoints->>KS: insert_reference_batch(reference_values)
DriftEndpoints->>KS: insert_current_batch(current_values)
DriftEndpoints->>KS: kstest(alpha)
KS-->>DriftEndpoints: result_per_feature
end
DriftEndpoints-->>FastAPI_router: aggregated_result
FastAPI_router-->>Client: JSON response
Class diagram for streaming KS test and GK quantile sketchclassDiagram
class GreenwaldKhannaSketch {
+float epsilon
+int n
+list summary
+list _cumulative_r_max
+bool _cumulative_cache_valid
+__init__(epsilon float)
+insert(value float) void
+delete(value float) void
+quantile(phi float) float
+rank(value float) float
+cdf(value float) float
+min() float
+max() float
+__len__() int
+size() int
+merge(other GreenwaldKhannaSketch) GreenwaldKhannaSketch
+to_dict() dict
+from_dict(data dict) GreenwaldKhannaSketch
-_ensure_cumulative_cache() void
-_compress() void
}
class KolmogorovSmirnovStreaming {
+float epsilon
-GreenwaldKhannaSketch _reference_sketch
-GreenwaldKhannaSketch _current_sketch
+__init__(epsilon float)
+insert_reference(value float) void
+insert_current(value float) void
+insert_reference_batch(values list) void
+insert_current_batch(values list) void
+statistic() float
+p_value() float
+kstest(alpha float) dict
+reset_reference() void
+reset_current() void
+reset() void
+n_reference() int
+n_current() int
+reference_sketch() GreenwaldKhannaSketch
+current_sketch() GreenwaldKhannaSketch
+to_dict() dict
+from_dict(data dict) KolmogorovSmirnovStreaming
}
class ApproxKSTestMetricRequest {
+str model_id
+Optional~str~ metric_name
+Optional~str~ request_name
+int batch_size
+float threshold_delta
+Optional~str~ reference_tag
+list fit_columns
+float epsilon
+retrieve_tags() dict
}
class ScheduleId {
+str requestId
}
class PrometheusScheduler {
+register(metric_name str, request_id uuid, request ApproxKSTestMetricRequest) void
+delete(metric_name str, request_id uuid) void
+get_requests(metric_name str) dict
}
class DataSource {
+get_dataframe_by_tag(model_id str, reference_tag str)
+get_organic_dataframe(model_id str, batch_size int)
}
class DriftEndpoints {
+compute_ksteststreaming(request ApproxKSTestMetricRequest) dict
+get_ksteststreaming_definition() dict
+schedule_ksteststreaming(request ApproxKSTestMetricRequest) dict
+delete_ksteststreaming_schedule(schedule ScheduleId) dict
+list_ksteststreaming_requests() dict
+compute_approxkstest_deprecated(request ApproxKSTestMetricRequest) dict
+get_approxkstest_definition_deprecated() dict
+schedule_approxkstest_deprecated(request ApproxKSTestMetricRequest) dict
+delete_approxkstest_schedule_deprecated(schedule ScheduleId) dict
+list_approxkstest_requests_deprecated() dict
}
KolmogorovSmirnovStreaming --> GreenwaldKhannaSketch : uses
DriftEndpoints --> KolmogorovSmirnovStreaming : computes_metric_with
DriftEndpoints --> DataSource : loads_data_from
DriftEndpoints --> PrometheusScheduler : schedules_with
DriftEndpoints --> ApproxKSTestMetricRequest : accepts
DriftEndpoints --> ScheduleId : identifies_schedule
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
PR image build and manifest generation completed successfully! 📦 PR image: 🗂️ CI manifests |
There was a problem hiding this comment.
Hey - I've found 12 issues, and left some high level feedback:
- In
delete_ksteststreaming_schedule, theHTTPExceptionraised for an invalidrequestIdis immediately caught and converted to a 500 response; you probably want to either special-caseHTTPExceptionin theexceptblock (as you did incompute_ksteststreaming) or narrow the outer exception handler so that the 400 status for bad UUIDs is preserved. - The
test_delete_invalid_uuidtest expects a 500 response for an invalidrequestIdwith a comment saying the endpoint returns 500, but the implementation now returns 400; align either the test expectation or the endpoint behavior so they match.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `delete_ksteststreaming_schedule`, the `HTTPException` raised for an invalid `requestId` is immediately caught and converted to a 500 response; you probably want to either special-case `HTTPException` in the `except` block (as you did in `compute_ksteststreaming`) or narrow the outer exception handler so that the 400 status for bad UUIDs is preserved.
- The `test_delete_invalid_uuid` test expects a 500 response for an invalid `requestId` with a comment saying the endpoint returns 500, but the implementation now returns 400; align either the test expectation or the endpoint behavior so they match.
## Individual Comments
### Comment 1
<location path="src/core/metrics/drift/greenwald_khanna_quantile_sketch.py" line_range="66-75" />
<code_context>
+ return
+
+ pos = bisect.bisect(self.summary, value, key=lambda x: x[0])
+ compress_period = floor(1 / (2 * self.epsilon))
+
+ if pos == 0 or pos == len(self.summary):
+ g_i, delta_i = 1, 0
+ elif self.n <= compress_period:
+ # Early elements use Δ = 0 for stability
+ g_i, delta_i = 1, 0
+ else:
+ # Middle insertions: Δ = ⌊2εn⌋ - 1 (corrected from paper's ⌊2εn⌋ to maintain invariant)
+ # See: https://www.stevenengelhardt.com/2018/03/07/calculating-percentiles-on-streaming-data-part-2-notes-on-implementing-greenwald-khanna/
+ g_i = 1
+ delta_i = max(0, floor(2 * self.epsilon * self.n) - 1)
+
+ self.summary.insert(pos, (value, g_i, delta_i))
+ self._cumulative_cache_valid = False
+
+ if self.n % compress_period == 0:
+ self._compress()
+
</code_context>
<issue_to_address>
**issue:** When epsilon > 0.5, compress_period becomes 0 and will cause a ZeroDivisionError.
Because `1 / (2 * self.epsilon) < 1` when epsilon > 0.5, `floor(1 / (2 * self.epsilon))` becomes 0, which makes the `self.n % compress_period` check (and the same logic in `delete()`) raise `ZeroDivisionError`. Please either restrict epsilon to `(0, 0.5]` or handle a zero/too-small period (e.g., clamp `compress_period` to at least 1 or skip compression) so that valid `(0, 1)` epsilon values cannot crash the code.
</issue_to_address>
### Comment 2
<location path="src/core/metrics/drift/greenwald_khanna_quantile_sketch.py" line_range="270-279" />
<code_context>
+ def merge(self, other: "GreenwaldKhannaSketch") -> "GreenwaldKhannaSketch":
</code_context>
<issue_to_address>
**issue (bug_risk):** Simple concatenation of two GK summaries may violate GK error guarantees.
The current `merge` just concatenates two sorted summaries and calls `_compress()`, but GK summaries are not mergeable this way: `g` and `Δ` encode ranks relative to different streams. This likely breaks the GK invariants and weakens or voids the ε-approximation guarantees. If you need strict GK behavior, either rebuild from the raw data (if available) or implement a proper merge that recomputes `g`/`Δ` per the original GK algorithm or mergeable variants in the literature.
</issue_to_address>
### Comment 3
<location path="src/core/metrics/drift/kolmogorov_smirnov_streaming.py" line_range="170-179" />
<code_context>
+ """
+ Compute the approximate p-value for the two-sample KS test.
+
+ Uses scipy's kstwo distribution (exact two-sample KS distribution)
+ with the effective sample size for the asymptotic approximation.
+
+ :return: Approximate p-value
</code_context>
<issue_to_address>
**suggestion:** The docstring claims kstwo is an exact two-sample KS distribution, which is not accurate.
SciPy’s `kstwo` models the one-sample KS statistic for a sample of size `n`; it is not an exact two-sample KS distribution. Here it’s being used with an effective sample size `(n1 * n2) / (n1 + n2)` as an asymptotic approximation. Please rephrase the docstring to describe this as an approximation using the one-sample KS distribution with an effective sample size, rather than an exact two-sample distribution.
```suggestion
def p_value(self) -> float:
"""
Compute the approximate p-value for the two-sample KS test.
Uses SciPy's one-sample KS distribution (`scipy.stats.kstwo`) evaluated
at an effective sample size (n1 * n2) / (n1 + n2) as an asymptotic
approximation to the two-sample KS p-value.
:return: Approximate p-value
:raises ValueError: If either sketch is empty
"""
```
</issue_to_address>
### Comment 4
<location path="src/core/metrics/drift/kolmogorov_smirnov_streaming.py" line_range="213-214" />
<code_context>
+ - epsilon: Error parameter used
+ :raises ValueError: If either sketch is empty
+ """
+ stat = self.statistic()
+ p_val = self.p_value()
+
+ return {
</code_context>
<issue_to_address>
**suggestion (performance):** kstest() recomputes the KS statistic twice, which is unnecessary work.
`kstest()` calls `self.statistic()` and then `self.p_value()`, but `p_value()` recomputes the statistic internally, doubling the cost of the statistic and merge-scan. Please refactor `p_value()` to optionally take a precomputed statistic (e.g. `p_value(self, d_stat: float | None = None)`) so `kstest()` can reuse the value it already computed.
</issue_to_address>
### Comment 5
<location path="src/endpoints/metrics/drift/kolmogorov_smirnov_streaming.py" line_range="50-59" />
<code_context>
+ fit_columns: List[str] = Field(default_factory=list, alias="fitColumns")
+
+ # Streaming-specific field: epsilon for GK sketch accuracy
+ epsilon: float = Field(default=0.01, description="Error parameter for GK sketch (default: 0.01)")
+
+ def retrieve_tags(self) -> Dict[str, str]:
+ """Retrieve tags for this ApproxKSTest metric request."""
+ tags = self.retrieve_default_tags()
+ if self.reference_tag:
+ tags["referenceTag"] = self.reference_tag
+ if self.fit_columns:
+ tags["fitColumns"] = ",".join(self.fit_columns)
+ if self.epsilon:
+ tags["epsilon"] = str(self.epsilon)
+ return tags
</code_context>
<issue_to_address>
**suggestion (bug_risk):** epsilon is only stored in tags if truthy, and invalid epsilon values will surface as 500s instead of 400s.
Two points:
1) `retrieve_tags` uses `if self.epsilon:`, so an explicit `epsilon=0.0` won’t be tagged even though it differs from the default. To reflect the configured value, check `epsilon is not None` instead.
2) The request model doesn’t enforce `epsilon` in `(0, 1)`, but the underlying sketches do and raise `ValueError` for out-of-range values. These end up as 500s via the generic `except Exception` in `compute_ksteststreaming`, when they should be treated as 4xx client errors. Consider either constraining `epsilon` at the Pydantic level (e.g., a constrained float) or catching `ValueError` explicitly and mapping it to a 400 with a clear message.
Suggested implementation:
```python
from pydantic import BaseModel, Field, confloat
```
```python
# Streaming-specific field: epsilon for GK sketch accuracy
# Must be in the open interval (0, 1) to satisfy GK sketch requirements.
epsilon: confloat(gt=0.0, lt=1.0) = Field(
default=0.01,
description="Error parameter for GK sketch; must be in (0, 1). Default: 0.01",
)
```
```python
if self.fit_columns:
tags["fitColumns"] = ",".join(self.fit_columns)
# Always reflect explicitly configured epsilon, even if it's 0.0
if self.epsilon is not None:
tags["epsilon"] = str(self.epsilon)
return tags
```
If `confloat` is not used elsewhere in the codebase or you are on Pydantic v2, you may instead prefer to:
1. Use `Annotated[float, Field(gt=0.0, lt=1.0)]` with `from typing import Annotated`, or
2. Use the Pydantic v2 constrained types API.
Adjust the import and the `epsilon` field type accordingly to match your existing Pydantic version and style.
</issue_to_address>
### Comment 6
<location path="tests/core/metrics/drift/test_greenwald_khanna_quantile_sketch.py" line_range="16" />
<code_context>
+from src.core.metrics.drift.greenwald_khanna_quantile_sketch import GreenwaldKhannaSketch
+
+
+class TestGreenwaldKhannaSketch:
+ """Test suite for GreenwaldKhannaSketch class."""
+
</code_context>
<issue_to_address>
**suggestion (testing):** Add explicit tests for GK sketch serialization (to_dict/from_dict), including invalid payloads.
Since `GreenwaldKhannaSketch.to_dict()` and `.from_dict()` are part of the public API, they should be covered by tests. Please add cases that:
- Build a sketch, insert data, serialize with `to_dict()`, reconstruct with `from_dict()`, and assert that core properties and queries (`epsilon`, `n`, `summary`, `quantile`, `min`, `max`, and ideally `rank`) match before and after.
- Assert that `from_dict()` raises `ValueError` for malformed inputs (missing keys, wrong types for `summary`, non-dict payloads).
This will help prevent regressions in sketch persistence/transport behavior.
</issue_to_address>
### Comment 7
<location path="tests/core/metrics/drift/test_kolmogorov_smirnov_streaming.py" line_range="211" />
<code_context>
+ assert abs(approx_stat - exact_stat) < 4 * epsilon + 0.1
+
+
+class TestKolmogorovSmirnovStreamingRegression:
+ """Regression tests with fixed data."""
+
</code_context>
<issue_to_address>
**suggestion (testing):** Add tests for KolmogorovSmirnovStreaming serialization via to_dict/from_dict to guarantee state round-trips.
Since `KolmogorovSmirnovStreaming` exposes `to_dict()`/`from_dict()`, add tests that serialize an instance with reference/current data, reconstruct it, and compare `epsilon`, `n_reference`, `n_current`, and `statistic()`/`p_value()` (within numeric tolerance) between original and restored instances. Also consider a negative test to ensure `from_dict()` fails cleanly on malformed input (e.g., missing `reference_sketch` or `current_sketch`).
Suggested implementation:
```python
class TestKolmogorovSmirnovStreamingRegression:
"""Regression tests with fixed data, including serialization round-trips."""
def test_serialization_round_trip(self):
epsilon = 0.01
rng = np.random.RandomState(42)
# Fixed reference and current data for regression
ref_data = rng.normal(loc=0.0, scale=1.0, size=500)
cur_data = rng.normal(loc=0.5, scale=1.5, size=600)
ks = KolmogorovSmirnovStreaming(epsilon=epsilon)
ks.insert_reference_batch(ref_data)
ks.insert_current_batch(cur_data)
# Capture key properties and metrics before serialization
orig_epsilon = ks.epsilon
orig_n_reference = ks.n_reference
orig_n_current = ks.n_current
orig_stat = ks.statistic()
orig_p_value = ks.p_value()
# Serialize and deserialize
state = ks.to_dict()
restored_ks = KolmogorovSmirnovStreaming.from_dict(state)
# Compare configuration and internal counts exactly
assert restored_ks.epsilon == orig_epsilon
assert restored_ks.n_reference == orig_n_reference
assert restored_ks.n_current == orig_n_current
# Compare statistic and p_value within numerical tolerance
assert np.isclose(restored_ks.statistic(), orig_stat, rtol=1e-12, atol=1e-12)
assert np.isclose(restored_ks.p_value(), orig_p_value, rtol=1e-12, atol=1e-12)
def test_from_dict_malformed_input(self):
epsilon = 0.01
rng = np.random.RandomState(123)
ref_data = rng.normal(size=100)
cur_data = rng.normal(size=120)
ks = KolmogorovSmirnovStreaming(epsilon=epsilon)
ks.insert_reference_batch(ref_data)
ks.insert_current_batch(cur_data)
state = ks.to_dict()
# Remove required keys to simulate malformed state
malformed_state_missing_reference = dict(state)
malformed_state_missing_reference.pop("reference_sketch", None)
malformed_state_missing_current = dict(state)
malformed_state_missing_current.pop("current_sketch", None)
# from_dict should fail cleanly on malformed input
with pytest.raises((KeyError, ValueError)):
KolmogorovSmirnovStreaming.from_dict(malformed_state_missing_reference)
with pytest.raises((KeyError, ValueError)):
KolmogorovSmirnovStreaming.from_dict(malformed_state_missing_current)
These tests verify the KolmogorovSmirnovStreaming class which implements
```
1. Ensure `KolmogorovSmirnovStreaming` is imported at the top of this test file, for example:
`from alibi_detect.cd.ks import KolmogorovSmirnovStreaming` (or the correct module path used elsewhere in your codebase).
2. If the actual serialized keys used by `to_dict()` differ from `"reference_sketch"`/`"current_sketch"`, update the `pop(...)` calls in `test_from_dict_malformed_input` accordingly so that required fields are removed and `from_dict()` fails as intended.
3. If the API for inserting data uses different method names (e.g., `update_reference`/`update_current` instead of `insert_reference_batch`/`insert_current_batch`), adjust the test setup calls to match.
</issue_to_address>
### Comment 8
<location path="tests/core/metrics/drift/test_kolmogorov_smirnov_streaming.py" line_range="230-239" />
<code_context>
+ # With very small epsilon, should be very close
+ assert abs(approx_stat - exact_stat) < 0.05
+
+ def test_uniform_vs_normal_detects_drift(self):
+ """Test that uniform vs normal distribution is detected."""
+ ks = KolmogorovSmirnovStreaming(epsilon=0.01)
+
+ rng = np.random.default_rng(42)
+ ref_data = rng.uniform(0, 1, 500)
+ cur_data = rng.normal(0.5, 0.2, 500)
+
+ ks.insert_reference_batch(ref_data)
+ ks.insert_current_batch(cur_data)
+
+ result = ks.kstest(alpha=0.05)
+
+ assert result["drift_detected"] is True
</code_context>
<issue_to_address>
**suggestion (testing):** Consider a test that explicitly checks the alpha parameter and drift_detected thresholding in kstest().
Current tests use `alpha=0.05` and assert on `drift_detected`, but they don’t show that varying `alpha` affects the decision. Please add a case where the p-value is near a boundary (e.g. ~0.1), and verify that `drift_detected` is False for `alpha=0.05` but True for a larger `alpha` (e.g. `0.2`), and that the returned `alpha` matches the input. This will more directly exercise the coupling between `alpha` and the decision logic.
Suggested implementation:
```python
result = ks.kstest(alpha=0.05)
assert result["drift_detected"] is True
def test_alpha_parameter_affects_drift_decision(self):
"""Test that alpha parameter controls drift_detected thresholding."""
rng = np.random.default_rng(123)
for _ in range(100):
# Generate two similar distributions where the KS test p-value
# is likely to fall in (0.05, 0.2)
ref_data = rng.normal(0.0, 1.0, 400)
cur_data = rng.normal(0.15, 1.0, 400)
ks = KolmogorovSmirnovStreaming(epsilon=0.01)
ks.insert_reference_batch(ref_data)
ks.insert_current_batch(cur_data)
low_alpha = 0.05
high_alpha = 0.20
result_low = ks.kstest(alpha=low_alpha)
p_value = result_low["p_value"]
# Only proceed when the p-value is in the desired range so that
# alpha=0.05 yields no drift while alpha=0.2 yields drift.
if low_alpha < p_value < high_alpha:
result_high = ks.kstest(alpha=high_alpha)
# p-value should be independent of alpha
assert result_high["p_value"] == pytest.approx(p_value)
# Returned alpha should match the input alpha
assert result_low["alpha"] == low_alpha
assert result_high["alpha"] == high_alpha
# With the same p-value, increasing alpha should flip the decision
assert result_low["drift_detected"] is False
assert result_high["drift_detected"] is True
break
else:
pytest.skip("Could not generate data with p-value between 0.05 and 0.2")
def test_bimodal_vs_unimodal_detects_drift(self):
```
If `pytest` is not already imported at the top of `tests/core/metrics/drift/test_kolmogorov_smirnov_streaming.py`, add `import pytest` alongside the other imports so that `pytest.approx` and `pytest.skip` are available.
</issue_to_address>
### Comment 9
<location path="tests/endpoints/metrics/drift/test_kolmogorov_smirnov_streaming.py" line_range="17-26" />
<code_context>
+class TestKSTestStreamingEndpoints:
+ """Unified endpoint tests for KS Test Streaming metric."""
+
+ # Pandas DataFrame tests
+ test_compute_endpoint_pandas = factory.make_compute_endpoint_test(
+ metric_name="KSTestStreaming",
+ module_path="src.endpoints.metrics.drift.kolmogorov_smirnov_streaming",
+ endpoint_path="/metrics/drift/ksteststreaming",
+ client=client,
+ request_payload={
+ "modelId": "test-model",
+ "referenceTag": "baseline",
+ "fitColumns": ["feature1", "feature2"],
+ "batchSize": 100,
+ "epsilon": 0.01,
+ },
+ expected_response_keys=["status", "value", "drift_detected", "p_value", "alpha", "epsilon"],
+ df_type="Pandas",
+ )
+
+ # Polars DataFrame tests
+ test_compute_endpoint_polars = factory.make_compute_endpoint_test(
+ metric_name="KSTestStreaming",
</code_context>
<issue_to_address>
**suggestion (testing):** Add an endpoint test that asserts multi-feature responses include per-feature results and correct aggregation.
Given the richer payload for multi-feature requests (`feature_results` per feature, top-level `value` as max statistic and `p_value` as min p-value), please add a test that sends multiple `fitColumns` and asserts:
- `feature_results` exists with one entry per feature, each containing the expected keys (`statistic`, `p_value`, `drift_detected`, etc.).
- Top-level `value` equals `max(feature_results[*].statistic)` and top-level `p_value` equals `min(feature_results[*].p_value)`.
If the factory allows inspecting the full response, you can extend an existing compute test to cover these invariants instead of adding a new one from scratch.
Suggested implementation:
```python
client = TestClient(app)
def test_ksteststreaming_multi_feature_aggregation():
"""Ensure multi-feature responses include per-feature results and correct aggregation."""
payload = {
"modelId": "test-model",
"referenceTag": "baseline",
"fitColumns": ["feature1", "feature2"],
"batchSize": 100,
"epsilon": 0.01,
}
response = client.post("/metrics/drift/ksteststreaming", json=payload)
assert response.status_code == 200
body = response.json()
# feature_results exists and has one entry per feature
assert "feature_results" in body
feature_results = body["feature_results"]
assert isinstance(feature_results, list)
assert len(feature_results) == len(payload["fitColumns"])
# each entry has expected keys
expected_feature_keys = {"feature", "statistic", "p_value", "drift_detected"}
for result in feature_results:
assert expected_feature_keys.issubset(result.keys())
# top-level value and p_value aggregate per-feature statistics
max_statistic = max(r["statistic"] for r in feature_results)
min_p_value = min(r["p_value"] for r in feature_results)
assert body["value"] == max_statistic
assert body["p_value"] == min_p_value
```
If the actual per-feature schema differs (for example, if the feature name key is not `"feature"` or the statistic key is named differently), you should adjust `expected_feature_keys` and the accessors in `max_statistic`/`min_p_value` accordingly.
If there is an existing test factory helper that exposes the full response for inspection, you could alternatively migrate this logic into a factory-based test to keep consistency with the test style in the rest of the file.
</issue_to_address>
### Comment 10
<location path="docs/implementation_streaming_ks.md" line_range="18" />
<code_context>
+
+1. **Greenwald-Khanna Quantile Sketch** (`src/core/metrics/drift/greenwald_khanna_quantile_sketch.py`)
+ - Space-efficient streaming quantile data structure
+ - Maintains ε-approximate quantile summary with O(1/ε log(εn)) space complexity
+
+2. **Streaming KS Test** (`src/core/metrics/drift/kolmogorov_smirnov_streaming.py`)
</code_context>
<issue_to_address>
**nitpick (typo):** Consider adding an article: "an ε-approximate quantile summary" for grammatical correctness.
You could rephrase this as: “Maintains an ε-approximate quantile summary with O(1/ε log(εn)) space complexity.”
```suggestion
- Maintains an ε-approximate quantile summary with O(1/ε log(εn)) space complexity
```
</issue_to_address>
### Comment 11
<location path="docs/implementation_streaming_ks.md" line_range="66" />
<code_context>
+
+1. **Simpler Error Bound**: Single 4ε bound vs. complex δ ≤ 1/(a-1) + ε that depends on number of interpolation points. With Lall's approach, just set ε and the error bound is automatic; Spark's approach requires choosing *a* to balance accuracy vs. space.
+
+2. **Better Space Efficiency**: O(1/ε log(εn)) vs. O(√N) for CDFs. For N=1M, ε=0.01: ~2.2 KB (ours) vs. ~1,000 points (Spark interpolated CDF). Eck et al. explicitly note: "Approximate CDFs scale as O(√N), compared to O(1/ε log(ϵN)) for direct sketch methods."
+
+3. **No Intermediate Representation**: Direct sketch-to-KS computation eliminates the interpolated CDF layer:
</code_context>
<issue_to_address>
**suggestion (typo):** Unify the epsilon symbol in the complexity term (ε vs ϵ) to avoid confusion.
In this bullet you have `O(1/ε log(εn))` earlier and `O(1/ε log(ϵN))` in the quoted text, using `ε` vs `ϵ`. If they represent the same parameter, please standardize on a single glyph for consistency.
```suggestion
2. **Better Space Efficiency**: O(1/ε log(εn)) vs. O(√N) for CDFs. For N=1M, ε=0.01: ~2.2 KB (ours) vs. ~1,000 points (Spark interpolated CDF). Eck et al. explicitly note: "Approximate CDFs scale as O(√N), compared to O(1/ε log(εN)) for direct sketch methods."
```
</issue_to_address>
### Comment 12
<location path="docs/implementation_streaming_ks.md" line_range="145-148" />
<code_context>
+- **Simple, readable code**: Easier to understand, maintain, and debug
+- **Fast iteration**: Compression and quantile queries benefit from linear memory access
+
+**5. When Alternative Data Structures Would Be Beneficial**
+
+Alternative structures like `SortedList` would only provide meaningful benefits if:
+- Summary size exceeds 1,000 tuples (requires ε ≥ 0.1, which is atypical)
+- Inserting billions of elements in tight loops
+- Profiling shows insert operations consuming >80% of runtime
</code_context>
<issue_to_address>
**issue:** The inequality on ε appears inverted relative to the stated condition on summary size.
Given the stated space bound O(1/ε log(εn)), larger ε means fewer tuples. So a summary size above 1,000 tuples would normally correspond to smaller ε, not `ε ≥ 0.1`. Please adjust the inequality (e.g., to `ε ≤ ...`) or reword the condition so it reflects this relationship correctly.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
…streaming algorithms for the Kolmogorov-Smirnov test, in: 2015 IEEE International Conference on Big Data (Big Data). Presented at the 2015 IEEE International Conference on Big Data (Big Data), IEEE, Santa Clara, CA, USA, pp. 95–104. https://doi.org/10.1109/BigData.2015.7363746
Add comprehensive implementation documentation explaining design decisions, algorithm details, and performance characteristics for the streaming Kolmogorov-Smirnov test with Greenwald-Khanna quantile sketches. Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
Fix critical bugs, improve performance, add comprehensive tests, and
refine epsilon parameter constraints based on algorithm characteristics.
- Fix ZeroDivisionError when epsilon > 0.5 by constraining epsilon to
(0, 0.5] through validation; with this constraint, compress_period is
guaranteed >= 1, eliminating need for defensive max(1, ...) clamping
- Implement proper merge() that recalculates g and Δ values based on
combined rank bounds, preserving GK error guarantees instead of naive
concatenation that violated invariants
- Fix delete endpoint exception handling to preserve 400 status codes
for invalid UUIDs instead of converting to 500
- Optimize kstest() to avoid redundant statistic computation by making
p_value() accept optional precomputed statistic parameter
- Refine epsilon constraint from (0, 1) to (0, 0.5] based on algorithm
characteristics:
* Values > 0.5 cause compress_period = 0 (undefined behavior)
* Value = 0.5 causes constant compression (inefficient but allowed)
* Add UserWarning when epsilon > 0.4 to inform users
* Remove redundant max(1, ...) clamps since validation ensures
compress_period >= 1
- Add Pydantic Field constraints (gt=0.0, le=0.5) for epsilon validation
at API level, returning 422 for invalid values
- Fix retrieve_tags() to use `is not None` instead of truthy check for
epsilon=0.0 edge case
- Update p_value() docstring to clarify it uses scipy's one-sample KS
distribution as asymptotic approximation, not exact two-sample
- Add GK sketch serialization tests (round-trip and malformed input)
- Add KS streaming serialization tests (round-trip and malformed input)
- Add alpha parameter test verifying drift_detected threshold control
- Add multi-feature endpoint test verifying per-feature aggregation
- Add epsilon boundary tests (0.5 allowed with warning, 0.51 rejected)
- Update invalid UUID test expectation from 500 to 400
- Add comprehensive "Understanding the Error Parameter ε" section
explaining why ε ∈ (0, 0.5] and typical values
- Fix grammar: "Maintains an ε-approximate..." (add article)
- Standardize epsilon symbol (ε not ϵ) throughout
- Fix inequality for summary size > 1,000 tuples (ε ≤ 0.001 not ≥ 0.1)
- Document warning behavior for epsilon > 0.4
- Remove obsolete approx_ks_test.py file (78 lines)
Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
- Relax GK invariant check for tuples with Δ=0 (exact min/max values) When merging sketches with many duplicates, the first/last tuples can have g > floor(2εn) while maintaining Δ=0. Since rank error is bounded by Δ, these tuples still provide exact values (error=0) regardless of g. - Add type validation in from_dict() to properly reject invalid types Change non-dict input exception from ValueError to TypeError for better semantic correctness. Add explicit check for summary field type to reject non-list values (previously "not a list" was silently converted to ['n','o','t',...] by list() constructor). Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
Bring the streaming KS test implementation in line with the ruff select=["ALL"] baseline from PR #122 and address review findings: - Use EPSILON_DEFAULT constant as single source of truth across source, endpoint, and test files - Replace warnings.warn with logger.warning (appropriate for deployed service where users set epsilon via API) - Use dict[str, Any] for from_dict() deserialization parameters - Use direct Field(default=METRIC_NAME) instead of @model_validator for metric_name - Use hyphens in code strings, reserve EN dashes for comments/docstrings - Revert absurd test constants to inline values, keep meaningful epsilon presets - Switch warning tests from warnings.catch_warnings to caplog - Remove vestigial pylint disable comments - Document delete() behavior with duplicate summary entries - Add per-file-ignores for unicode math notation (RUF001-003) and magic values in tests (PLR2004) Co-Authored-By: Claude Opus 4.6 <[email protected]>
36fb3a4 to
8442953
Compare
|
Warning Rate limit exceeded
To continue reviewing without waiting, purchase usage credits in the billing tab. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (5)
📝 WalkthroughWalkthroughThis PR implements a complete streaming Kolmogorov–Smirnov (KS) drift detection system. It introduces a Greenwald–Khanna quantile sketch for memory-efficient streaming quantile estimation, builds a two-sample KS test on top of it, exposes the feature through FastAPI endpoints, and includes comprehensive test coverage and documentation. ChangesStreaming KS Drift Detection
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (5)
tests/endpoints/metrics/drift/test_kolmogorov_smirnov_streaming.py (1)
125-125: 💤 Low valueUse
HTTPStatus.BAD_REQUESTfor consistency.The rest of this module uses
HTTPStatus.*(e.g.,HTTPStatus.INTERNAL_SERVER_ERROR,HTTPStatus.SERVICE_UNAVAILABLE). The error-test cases above pass the bare integer400, which is a small consistency gap.♻️ Proposed change
- expected_status_code=400, + expected_status_code=HTTPStatus.BAD_REQUEST,Also applies to: 139-139, 153-153, 164-164
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/endpoints/metrics/drift/test_kolmogorov_smirnov_streaming.py` at line 125, Replace the literal integer 400 used as expected_status_code in the failing test calls with the HTTPStatus enum for consistency; update the calls in tests/endpoints/metrics/drift/test_kolmogorov_smirnov_streaming.py that pass expected_status_code=400 (including the occurrences around lines with the test helper invocations at the current diffs) to use HTTPStatus.BAD_REQUEST instead, and ensure HTTPStatus is imported in that test module if not already referenced.tests/core/metrics/drift/test_greenwald_khanna_quantile_sketch.py (2)
1164-1164: 💤 Low valuePrefer
np.random.default_rngfor consistency.Other tests in this file already use
np.random.default_rng(seed).np.random.RandomStateis the legacy NumPy API; using a single style reduces churn and is the recommended approach with NumPy ≥ 1.17.♻️ Proposed change
- rng = np.random.RandomState(42) - data = rng.normal(loc=0.0, scale=1.0, size=500) + rng = np.random.default_rng(42) + data = rng.normal(loc=0.0, scale=1.0, size=500)Also applies to: 1208-1208
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/core/metrics/drift/test_greenwald_khanna_quantile_sketch.py` at line 1164, Replace legacy RNG usage: in the test_greenwald_khanna_quantile_sketch tests where rng is created with np.random.RandomState(42) (also the occurrence at the second spot), switch to NumPy's recommended API by calling np.random.default_rng(42) and update any subsequent usage that expects RandomState methods to the default_rng equivalents (e.g., rng.integers, rng.random) so the tests use the modern Generator interface consistently.
1102-1159: 💤 Low valueStale test comments referring to source line numbers.
This test references "lines 165, 168-170 in the
_query_rankmethod" but the current source no longer has a_query_rankmethod (the logic was inlined intoquantile()at lines 164-190 with simplified branches). The block at lines 1119-1149 documents that the targeted lines are unreachable, then ends with a generic smoke test that doesn't exercise anything specific. Consider trimming this to just the smoke test and removing the obsolete crafted-state setups, which manipulate internals (e.g.,sketch._cumulative_r_max = [6, 7]) in ways that would silently mask future regressions.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/core/metrics/drift/test_greenwald_khanna_quantile_sketch.py` around lines 1102 - 1159, The test test_query_rank_edge_cases_direct contains stale comments and fragile crafted-state manipulations of GreenwaldKhannaSketch internals (_cumulative_r_max, _cumulative_cache_valid, summary, n) that target removed lines in quantile/_query_rank; remove the obsolete setups and explanatory line-number comments and keep only the concise smoke test: construct a sketch, insert 50 sequential values, and assert quantile(phi) stays within [0.0, 49.0] for the chosen phi values, avoiding any direct mutation of _cumulative_r_max or other private internals.tests/core/metrics/drift/test_kolmogorov_smirnov_streaming.py (1)
104-113: ⚡ Quick winAdd an epsilon-out-of-GK-range case to lock down the validation contract.
Given the inconsistency between this class's
(0, 1)bound and the GK sketch's(0, 0.5](flagged on the implementation), it would be valuable to assert howepsilon=0.7is rejected. After the implementation tightens the bound, this test will protect against regressions.♻️ Proposed addition
with pytest.raises(ValueError, match="epsilon must be in the range"): KolmogorovSmirnovStreaming(epsilon=-0.1) + + # Above the GK sketch's allowed upper bound (0.5) + with pytest.raises(ValueError, match="epsilon must be in the range"): + KolmogorovSmirnovStreaming(epsilon=0.7)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/core/metrics/drift/test_kolmogorov_smirnov_streaming.py` around lines 104 - 113, Add a test assertion that epsilon values outside the GK sketch valid range are rejected: in test_initialization_invalid_epsilon, include a with pytest.raises(ValueError, match="epsilon must be in the range") block that constructs KolmogorovSmirnovStreaming(epsilon=0.7) so the test documents and protects against allowing epsilon values like 0.7 (outside the GK sketch's (0, 0.5] bound) from being accepted; reference the KolmogorovSmirnovStreaming constructor and the test_initialization_invalid_epsilon test to locate where to add this case.src/core/metrics/drift/greenwald_khanna_quantile_sketch.py (1)
297-392: ⚡ Quick win
merge()is O((m+n) · max(m,n)) due to repeated linearget_rank_bounds.
get_rank_boundsrecomputesr_min = sum(g for _, g, _ in sketch.summary[: idx + 1])(line 337) on every call, and is invoked O(m+n) times in the merge loop. This degrades the documented O(m+n) merge to roughly O((m+n) · max(m,n)).Both sketches already have
_cumulative_r_maxcached via_ensure_cumulative_cache()(called on lines 324-325). You can deriver_mindirectly from the cache:r_min = _cumulative_r_max[idx] - summary[idx][2]. This makesget_rank_boundsO(1) and merge truly O(m+n).♻️ Proposed refactor
def get_rank_bounds( sketch: "GreenwaldKhannaSketch", idx: int ) -> tuple[int, int]: if idx < 0: return 0, 0 if idx >= len(sketch.summary): return sketch.n, sketch.n - # r_min is the sum of g values up to and including idx - r_min = sum(g for _, g, _ in sketch.summary[: idx + 1]) - r_max = r_min + sketch.summary[idx][2] + # r_max is cached; r_min = r_max - delta + delta = sketch.summary[idx][2] + r_max = sketch._cumulative_r_max[idx] # noqa: SLF001 + r_min = r_max - delta return r_min, r_max🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/core/metrics/drift/greenwald_khanna_quantile_sketch.py` around lines 297 - 392, The merge() implementation is quadratic because get_rank_bounds recomputes prefix sums with sum(...) each call; instead use the cached cumulative array built by _ensure_cumulative_cache(): in get_rank_bounds(sketch, idx) return (0,0) for idx<0, (sketch.n, sketch.n) for idx>=len(sketch.summary), otherwise compute r_max = sketch._cumulative_r_max[idx] and r_min = r_max - sketch.summary[idx][2]; use those r_min/r_max values to compute combined_r_min/combined_r_max so merge() becomes O(m+n) (update references to get_rank_bounds in merge and keep the _ensure_cumulative_cache() calls).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/core/metrics/drift/greenwald_khanna_quantile_sketch.py`:
- Around line 130-148: The delete() logic can leave the new first or last
summary tuple with a non-zero delta, breaking the GK invariant that boundary
tuples must have delta==0; update the code in delete() (referencing
self.summary, pos, and the branches for pos==0 and pos==len(self.summary)-1) so
that after removing the minimum (pos==0) you set the new first tuple's delta to
0 (i.e., when assigning self.summary[1] or after pop ensure self.summary[0] =
(v, g, 0)), and symmetrically after removing the maximum (pop at end) set the
new last tuple's delta to 0; keep the existing g transfer (g_next + g_pos - 1)
but clamp/reset delta to 0 for the new boundary tuples so
check_gk_invariants/min()/max() remain correct.
In `@src/core/metrics/drift/kolmogorov_smirnov_streaming.py`:
- Around line 49-53: The epsilon validation in kolmogorov_smirnov_streaming (the
check around epsilon and its docstring) is looser than
GreenwaldKhannaSketch.__init__'s contract and causes confusing errors; update
the docstring and validation to match GreenwaldKhannaSketch's accepted range (0,
0.5] or remove the duplicate check so the GreenwaldKhannaSketch.__init__ error
is authoritative—ensure any boundary warnings (e.g. the >0.4 advisory) remain
consistent with GK's logic and reference the same epsilon variable and
GreenwaldKhannaSketch class in the file.
- Around line 186-194: The code computes n_eff and passes round(n_eff) into
kstwo.sf, but Python's round uses banker's rounding so values like 0.5 become 0
and kstwo.sf cannot accept n=0; change the argument to kstwo.sf(d_stat, max(1,
int(round(n_eff)))) (i.e., coerce to int and guard with max(1,...)) where n_eff
is computed and kstwo.sf is called to ensure the effective sample size is at
least 1.
In `@src/endpoints/metrics/drift/kolmogorov_smirnov_streaming.py`:
- Around line 48-84: Add a Pydantic post-validation method on
ApproxKSTestMetricRequest using `@model_validator`(mode="after") that
unconditionally sets self.metric_name = METRIC_NAME (ensuring any
client-supplied metricName is overridden), import model_validator from pydantic
if needed, and then remove the redundant manual assignment of metric_name in
schedule_ksteststreaming so the model enforces the canonical metric name
centrally.
---
Nitpick comments:
In `@src/core/metrics/drift/greenwald_khanna_quantile_sketch.py`:
- Around line 297-392: The merge() implementation is quadratic because
get_rank_bounds recomputes prefix sums with sum(...) each call; instead use the
cached cumulative array built by _ensure_cumulative_cache(): in
get_rank_bounds(sketch, idx) return (0,0) for idx<0, (sketch.n, sketch.n) for
idx>=len(sketch.summary), otherwise compute r_max =
sketch._cumulative_r_max[idx] and r_min = r_max - sketch.summary[idx][2]; use
those r_min/r_max values to compute combined_r_min/combined_r_max so merge()
becomes O(m+n) (update references to get_rank_bounds in merge and keep the
_ensure_cumulative_cache() calls).
In `@tests/core/metrics/drift/test_greenwald_khanna_quantile_sketch.py`:
- Line 1164: Replace legacy RNG usage: in the
test_greenwald_khanna_quantile_sketch tests where rng is created with
np.random.RandomState(42) (also the occurrence at the second spot), switch to
NumPy's recommended API by calling np.random.default_rng(42) and update any
subsequent usage that expects RandomState methods to the default_rng equivalents
(e.g., rng.integers, rng.random) so the tests use the modern Generator interface
consistently.
- Around line 1102-1159: The test test_query_rank_edge_cases_direct contains
stale comments and fragile crafted-state manipulations of GreenwaldKhannaSketch
internals (_cumulative_r_max, _cumulative_cache_valid, summary, n) that target
removed lines in quantile/_query_rank; remove the obsolete setups and
explanatory line-number comments and keep only the concise smoke test: construct
a sketch, insert 50 sequential values, and assert quantile(phi) stays within
[0.0, 49.0] for the chosen phi values, avoiding any direct mutation of
_cumulative_r_max or other private internals.
In `@tests/core/metrics/drift/test_kolmogorov_smirnov_streaming.py`:
- Around line 104-113: Add a test assertion that epsilon values outside the GK
sketch valid range are rejected: in test_initialization_invalid_epsilon, include
a with pytest.raises(ValueError, match="epsilon must be in the range") block
that constructs KolmogorovSmirnovStreaming(epsilon=0.7) so the test documents
and protects against allowing epsilon values like 0.7 (outside the GK sketch's
(0, 0.5] bound) from being accepted; reference the KolmogorovSmirnovStreaming
constructor and the test_initialization_invalid_epsilon test to locate where to
add this case.
In `@tests/endpoints/metrics/drift/test_kolmogorov_smirnov_streaming.py`:
- Line 125: Replace the literal integer 400 used as expected_status_code in the
failing test calls with the HTTPStatus enum for consistency; update the calls in
tests/endpoints/metrics/drift/test_kolmogorov_smirnov_streaming.py that pass
expected_status_code=400 (including the occurrences around lines with the test
helper invocations at the current diffs) to use HTTPStatus.BAD_REQUEST instead,
and ensure HTTPStatus is imported in that test module if not already referenced.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 25c98550-6879-437b-a023-9a5d5c631a40
📒 Files selected for processing (10)
docs/implementation_streaming_ks.mdpyproject.tomlsrc/core/metrics/drift/greenwald_khanna_quantile_sketch.pysrc/core/metrics/drift/kolmogorov_smirnov_streaming.pysrc/endpoints/metrics/drift/approx_ks_test.pysrc/endpoints/metrics/drift/kolmogorov_smirnov_streaming.pysrc/main.pytests/core/metrics/drift/test_greenwald_khanna_quantile_sketch.pytests/core/metrics/drift/test_kolmogorov_smirnov_streaming.pytests/endpoints/metrics/drift/test_kolmogorov_smirnov_streaming.py
💤 Files with no reviewable changes (1)
- src/endpoints/metrics/drift/approx_ks_test.py
| class ApproxKSTestMetricRequest(BaseMetricRequest): | ||
| """Request parameters for streaming KS test drift detection metric.""" | ||
|
|
||
| # Use field aliases to accept camelCase from API while keeping snake_case internally | ||
| model_config = ConfigDict(populate_by_name=True) | ||
|
|
||
| model_id: str = Field(alias="modelId") | ||
| metric_name: str = Field(default=METRIC_NAME, alias="metricName") | ||
| request_name: str | None = Field(default=None, alias="requestName") | ||
| batch_size: int = Field(default=100, alias="batchSize") | ||
|
|
||
| # ApproxKSTest-specific fields | ||
| threshold_delta: float = Field(default=0.05, alias="thresholdDelta") | ||
| reference_tag: str | None = Field(default=None, alias="referenceTag") | ||
| fit_columns: list[str] = Field(default_factory=list, alias="fitColumns") | ||
|
|
||
| # Streaming-specific field: epsilon for GK sketch accuracy | ||
| # Must be in the interval (0, 0.5] to satisfy GK sketch requirements. | ||
| # Values close to 0.5 cause algorithm degeneration (constant compression). | ||
| epsilon: float = Field( | ||
| default=EPSILON_DEFAULT, | ||
| gt=0.0, | ||
| le=0.5, | ||
| description="Error parameter for GK sketch; must be in (0, 0.5]. Default: 0.01", | ||
| ) | ||
|
|
||
| def retrieve_tags(self) -> dict[str, str]: | ||
| """Retrieve tags for this ApproxKSTest metric request.""" | ||
| tags = self.retrieve_default_tags() | ||
| if self.reference_tag: | ||
| tags["referenceTag"] = self.reference_tag | ||
| if self.fit_columns: | ||
| tags["fitColumns"] = ",".join(self.fit_columns) | ||
| # Always reflect explicitly configured epsilon, even if it's 0.0 | ||
| if self.epsilon is not None: | ||
| tags["epsilon"] = str(self.epsilon) | ||
| return tags |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win
Add @model_validator(mode="after") to enforce metric_name
The coding guidelines require @model_validator(mode="after") to auto-set metric_name. Using only Field(default=METRIC_NAME) leaves the door open for a client to pass "metricName": "anything" and have that value stored in request.metric_name. The schedule handler works around this with a manual override at line 232, but the compute endpoint has no such guard, and any future handler that reads request.metric_name would see the client-supplied value.
♻️ Proposed fix
-from pydantic import BaseModel, ConfigDict, Field
+from pydantic import BaseModel, ConfigDict, Field, model_validator epsilon: float = Field(
default=EPSILON_DEFAULT,
gt=0.0,
le=0.5,
description="Error parameter for GK sketch; must be in (0, 0.5]. Default: 0.01",
)
+
+ `@model_validator`(mode="after")
+ def _set_metric_name(self) -> "ApproxKSTestMetricRequest":
+ self.metric_name = METRIC_NAME
+ return self
def retrieve_tags(self) -> dict[str, str]:With the validator in place, remove the now-redundant manual assignment in schedule_ksteststreaming:
- # Set metric name automatically
- request.metric_name = METRIC_NAME
-
# Register with the scheduler (this will reconcile the request and store it)
await scheduler.register(request.metric_name, request_id, request)As per coding guidelines: "Endpoint handlers should use per-metric Pydantic request models inheriting BaseMetricRequest, with @model_validator(mode="after") to auto-set metric_name".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/endpoints/metrics/drift/kolmogorov_smirnov_streaming.py` around lines 48
- 84, Add a Pydantic post-validation method on ApproxKSTestMetricRequest using
`@model_validator`(mode="after") that unconditionally sets self.metric_name =
METRIC_NAME (ensuring any client-supplied metricName is overridden), import
model_validator from pydantic if needed, and then remove the redundant manual
assignment of metric_name in schedule_ksteststreaming so the model enforces the
canonical metric name centrally.
Bug fixes: - Fix delete() to reset delta=0 on new boundary tuples, preserving the GK invariant that first/last tuples must have delta==0 - Tighten KS epsilon validation from (0, 1) to (0, 0.5] to match the GK sketch contract and avoid confusing cascaded errors - Guard kstwo.sf effective sample size with max(1, ...) to prevent banker's rounding from producing n=0 Performance: - Fix merge() get_rank_bounds to use cached _cumulative_r_max for O(1) lookups instead of recomputing prefix sums, making merge O(m+n) Cleanup: - Replace bare 400 with HTTPStatus.BAD_REQUEST in endpoint tests - Replace legacy np.random.RandomState with np.random.default_rng - Remove stale test_query_rank_edge_cases_direct internal manipulation - Add epsilon=0.7 test to lock down KS validation contract Co-Authored-By: Claude Opus 4.6 <[email protected]>
Overview
This PR implements a true streaming two-sample Kolmogorov-Smirnov (KS) test for distribution drift detection using Greenwald-Khanna (GK) quantile sketches, based on Lall (2015).
JIRA: RHOAIENG-46252
Key benefits:
delete(),merge(), serializationWhat's New
Components:
greenwald_khanna_quantile_sketch.py- Space-efficient streaming quantile sketchkolmogorov_smirnov_streaming.py- Streaming two-sample KS test with O(m+n) merge-scanendpoints/metrics/drift/kolmogorov_smirnov_streaming.py- REST API at/metrics/drift/ksteststreamingTesting:
ks_2sampWhy This Approach?
We chose Lall's (2015) direct sketch-based method over alternatives (previous NumPy quantile approach and Spark's interpolated CDF method) because:
delete()for sliding windows,merge()for distributed processingscipy.stats.ks_2sampfor exact tests; we need memory-efficient streamingSee
docs/implementation_streaming_ks.mdfor detailed comparison and design rationale.Usage
REST API:
/metrics/drift/ksteststreaming(old/metrics/drift/approxkstestdeprecated but functional)Documentation
See
docs/implementation_streaming_ks.mdfor:References
Summary by Sourcery
Introduce a streaming two-sample Kolmogorov–Smirnov drift detection implementation based on Greenwald–Khanna quantile sketches, with full API exposure and tests.
New Features:
Enhancements:
Tests:
Summary by CodeRabbit
New Features
Documentation