Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from dataclasses import dataclass
import pandas as pd
from sklearn.metrics import adjusted_mutual_info_score, mutual_info_score
from scipy import stats

from ..common.examples import ExampleExtractor
from ...ml_worker.testing.registry.slicing_function import SlicingFunction
Expand All @@ -12,6 +10,7 @@
from ...models.base import BaseModel
from ..registry import Detector
from ..decorators import detector
from ...testing.tests.statistic import _cramer_v, _mutual_information, _theil_u


@detector(name="spurious_correlation", tags=["spurious_correlation", "classification"])
Expand Down Expand Up @@ -72,7 +71,14 @@ def run(self, model: BaseModel, dataset: Dataset):

if metric_value > self.threshold:
predictions = dx[dx.feature > 0].prediction.value_counts(normalize=True)
info = SpuriousCorrelationInfo(col, slice_fn, metric_value, measure_name, predictions)
info = SpuriousCorrelationInfo(
feature=col,
slice_fn=slice_fn,
metric_value=metric_value,
metric_name=measure_name,
threshold=self.threshold,
predictions=predictions,
)
issues.append(SpuriousCorrelationIssue(model, dataset, "info", info))

return issues
Expand All @@ -87,25 +93,13 @@ def _get_measure_fn(self):
raise ValueError(f"Unknown method `{self.method}`")


def _cramer_v(x, y):
ct = pd.crosstab(x, y)
return stats.contingency.association(ct, method="cramer")


def _mutual_information(x, y):
return adjusted_mutual_info_score(x, y)


def _theil_u(x, y):
return mutual_info_score(x, y) / stats.entropy(pd.Series(y).value_counts(normalize=True))


@dataclass
class SpuriousCorrelationInfo:
feature: str
slice_fn: SlicingFunction
metric_value: float
metric_name: str
threshold: float
predictions: pd.DataFrame


Expand Down Expand Up @@ -149,3 +143,41 @@ def examples(self, n=3):
@property
def importance(self) -> float:
return self.info.metric_value

def generate_tests(self, with_names=False) -> list:
test_fn = _metric_to_test_object(self.info.metric_name)

if test_fn is None:
return []

tests = [
test_fn(
model=self.model,
dataset=self.dataset,
slicing_function=self.info.slice_fn,
threshold=self.info.threshold,
)
]

if with_names:
names = [f"{self.info.metric_name} on data slice “{self.info.slice_fn}”"]
return list(zip(tests, names))

return tests


_metric_test_mapping = {
"Cramer's V": "test_cramer_v",
"Mutual information": "test_mutual_information",
"Theil's U": "test_theil_u",
}


def _metric_to_test_object(metric_name):
from ...testing.tests import statistic

try:
test_name = _metric_test_mapping[metric_name]
return getattr(statistic, test_name)
except (KeyError, AttributeError):
return None
216 changes: 176 additions & 40 deletions python-client/giskard/testing/tests/statistic.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
@test(name="Right Label", tags=["heuristic", "classification"])
@validate_classification_label
def test_right_label(
model: BaseModel,
dataset: Dataset,
classification_label: str,
slicing_function: Optional[SlicingFunction] = None,
threshold: float = 0.5,
debug: bool = False
model: BaseModel,
dataset: Dataset,
classification_label: str,
slicing_function: Optional[SlicingFunction] = None,
threshold: float = 0.5,
debug: bool = False,
) -> TestResult:
"""
Summary: Test if the model returns the right classification label for a slice
Expand Down Expand Up @@ -77,25 +77,20 @@ def test_right_label(
output_ds.name = debug_prefix + test_name
# ---

return TestResult(
actual_slices_size=[len(dataset)],
metric=passed_ratio,
passed=passed,
output_df=output_ds
)
return TestResult(actual_slices_size=[len(dataset)], metric=passed_ratio, passed=passed, output_df=output_ds)


@test(name="Output in range", tags=["heuristic", "classification", "regression"])
@validate_classification_label
def test_output_in_range(
model: BaseModel,
dataset: Dataset,
slicing_function: Optional[SlicingFunction] = None,
classification_label: Optional[str] = None,
min_range: float = 0.3,
max_range: float = 0.7,
threshold: float = 0.5,
debug: bool = False
model: BaseModel,
dataset: Dataset,
slicing_function: Optional[SlicingFunction] = None,
classification_label: Optional[str] = None,
min_range: float = 0.3,
max_range: float = 0.7,
threshold: float = 0.5,
debug: bool = False,
) -> TestResult:
"""
Summary: Test if the model output belongs to the right range for a slice
Expand Down Expand Up @@ -171,25 +166,20 @@ def test_output_in_range(
output_ds.name = debug_prefix + test_name
# ---

return TestResult(
actual_slices_size=[len(dataset)],
metric=passed_ratio,
passed=passed,
output_df=output_ds
)
return TestResult(actual_slices_size=[len(dataset)], metric=passed_ratio, passed=passed, output_df=output_ds)


@test(name="Disparate impact", tags=["heuristic", "classification"])
def test_disparate_impact(
model: BaseModel,
dataset: Dataset,
protected_slicing_function: SlicingFunction,
unprotected_slicing_function: SlicingFunction,
positive_outcome: str,
slicing_function: Optional[SlicingFunction] = None,
min_threshold: float = 0.8,
max_threshold: float = 1.25,
debug: bool = False,
model: BaseModel,
dataset: Dataset,
protected_slicing_function: SlicingFunction,
unprotected_slicing_function: SlicingFunction,
positive_outcome: str,
slicing_function: Optional[SlicingFunction] = None,
min_threshold: float = 0.8,
max_threshold: float = 1.25,
debug: bool = False,
) -> TestResult:
"""
Summary: Tests if the model is biased more towards an unprotected slice of the dataset over a protected slice.
Expand Down Expand Up @@ -296,9 +286,155 @@ def test_disparate_impact(
output_ds.name = debug_prefix + test_name
# ---

return TestResult(
metric=disparate_impact_score,
passed=passed,
messages=messages,
output_df=output_ds
return TestResult(metric=disparate_impact_score, passed=passed, messages=messages, output_df=output_ds)


def _cramer_v(x, y):
import pandas as pd
from scipy import stats

ct = pd.crosstab(x, y)
return stats.contingency.association(ct, method="cramer")


@test(name="Cramer's V", tags=["statistic", "classification"])
def test_cramer_v(
model: BaseModel, dataset: Dataset, slicing_function: SlicingFunction, threshold: float = 0.5, debug: bool = False
) -> TestResult:
"""
TBF
:param model:
:param dataset:
:param slicing_function:
:param threshold:
:param debug:
:return:
"""
import pandas as pd

sliced_dataset = dataset.slice(slicing_function)
check_slice_not_empty(sliced_dataset=sliced_dataset, dataset_name="dataset", test_name="test_cramer_v")

dx = pd.DataFrame(
{
"slice": dataset.df.index.isin(sliced_dataset.df.index).astype(int),
"prediction": model.predict(dataset).prediction,
},
index=dataset.df.index,
)
dx.dropna(inplace=True)

metric = _cramer_v(dx.slice, dx.prediction)
passed = metric < threshold

# --- debug ---
output_ds = None
if not passed and debug:
output_ds = sliced_dataset.copy() # copy all properties
test_name = inspect.stack()[0][3]
output_ds.name = debug_prefix + test_name
# ---

messages = [TestMessage(type=TestMessageLevel.INFO, text=f"metric = {metric}, threshold = {threshold}")]

return TestResult(metric=metric, passed=passed, messages=messages, output_df=output_ds)


def _mutual_information(x, y):
from sklearn.metrics import adjusted_mutual_info_score

return adjusted_mutual_info_score(x, y)


@test(name="Mutual Information", tags=["statistic", "classification"])
def test_mutual_information(
model: BaseModel, dataset: Dataset, slicing_function: SlicingFunction, threshold: float = 0.5, debug: bool = False
) -> TestResult:
"""
TBF
:param model:
:param dataset:
:param slicing_function:
:param threshold:
:param debug:
:return:
"""
import pandas as pd

sliced_dataset = dataset.slice(slicing_function)
check_slice_not_empty(sliced_dataset=sliced_dataset, dataset_name="dataset", test_name="test_mutual_information")

dx = pd.DataFrame(
{
"slice": dataset.df.index.isin(sliced_dataset.df.index).astype(int),
"prediction": model.predict(dataset).prediction,
},
index=dataset.df.index,
)
dx.dropna(inplace=True)

metric = _mutual_information(dx.slice, dx.prediction)
passed = metric < threshold

# --- debug ---
output_ds = None
if not passed and debug:
output_ds = sliced_dataset.copy() # copy all properties
test_name = inspect.stack()[0][3]
output_ds.name = debug_prefix + test_name
# ---

messages = [TestMessage(type=TestMessageLevel.INFO, text=f"metric = {metric}, threshold = {threshold}")]

return TestResult(metric=metric, passed=passed, messages=messages, output_df=output_ds)


def _theil_u(x, y):
import pandas as pd
from sklearn.metrics import mutual_info_score
from scipy import stats

return mutual_info_score(x, y) / stats.entropy(pd.Series(y).value_counts(normalize=True))


@test(name="Theil's U", tags=["statistic", "classification"])
def test_theil_u(
model: BaseModel, dataset: Dataset, slicing_function: SlicingFunction, threshold: float = 0.5, debug: bool = False
) -> TestResult:
"""
TBF
:param model:
:param dataset:
:param slicing_function:
:param threshold:
:param debug:
:return:
"""
import pandas as pd

sliced_dataset = dataset.slice(slicing_function)
check_slice_not_empty(sliced_dataset=sliced_dataset, dataset_name="dataset", test_name="test_theil_u")

dx = pd.DataFrame(
{
"slice": dataset.df.index.isin(sliced_dataset.df.index).astype(int),
"prediction": model.predict(dataset).prediction,
},
index=dataset.df.index,
)
dx.dropna(inplace=True)

metric = _theil_u(dx.slice, dx.prediction)
passed = metric < threshold

# --- debug ---
output_ds = None
if not passed and debug:
output_ds = sliced_dataset.copy() # copy all properties
test_name = inspect.stack()[0][3]
output_ds.name = debug_prefix + test_name
# ---

messages = [TestMessage(type=TestMessageLevel.INFO, text=f"metric = {metric}, threshold = {threshold}")]

return TestResult(metric=metric, passed=passed, messages=messages, output_df=output_ds)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only difference with the previous test is that we call _teil_u instead of _mutual_information. This could be refactored in a single test_nominal_association with a method attribute for example (e.g. theil or mutual_info) to avoid code repetition.