From 6c82ee7760ea6a54a3cc49a61ef72a8f60fc8748 Mon Sep 17 00:00:00 2001 From: "AJ (Ashitosh Jedhe)" Date: Tue, 25 Nov 2025 10:24:34 +0000 Subject: [PATCH 01/14] Add failing test for - gliner truncates text and misses names (PII) --- .../tests/test_gliner_token_truncation.py | 84 +++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 presidio-image-redactor/tests/test_gliner_token_truncation.py diff --git a/presidio-image-redactor/tests/test_gliner_token_truncation.py b/presidio-image-redactor/tests/test_gliner_token_truncation.py new file mode 100644 index 000000000..8dbf9adc9 --- /dev/null +++ b/presidio-image-redactor/tests/test_gliner_token_truncation.py @@ -0,0 +1,84 @@ +"""Test GLiNER token truncation behavior with entities beyond 384 token limit.""" +import pytest +from PIL import Image, ImageDraw, ImageFont +from presidio_analyzer import AnalyzerEngine, RecognizerRegistry +from presidio_image_redactor import ImageAnalyzerEngine +from presidio_analyzer.predefined_recognizers import GLiNERRecognizer + + + +@pytest.fixture(scope="module") +def mock_image_with_late_entities(): + """Create a test image with person names before and after the 384 token limit.""" + img = Image.new('RGB', (1000, 1400), color='white') + draw = ImageDraw.Draw(img) + font = ImageFont.load_default() + + text_lines = [ + "Patient: Dr. Amanda Williams", + "Doctor: Dr. James Patterson", + "", + ] + + # Add filler text to exceed 384 token limit + text_lines.extend([ + f"Par{i}: Medical documentation regarding treatment protocols. " + f"The facility maintains comprehensive records of consultations. " + f"Standard procedures require detailed documentation." + for i in range(60) + ]) + + # Add names beyond token limit + text_lines.extend(["", "Nurse: Jennifer Anderson", "Therapist: Christopher Davis"]) + + # Draw text on image + y = 30 + for line in text_lines: + draw.text((30, y), line, fill='black', font=font) + y += 20 + + return img + +def extract_detected_names(results, ocr_text, expected_names): + """Extract which expected names were detected from analyzer results.""" + detected_names = set() + + for result in results: + if result.start < len(ocr_text) and result.end <= len(ocr_text): + context = ocr_text[max(0, result.start - 20):min(len(ocr_text), result.end + 20)] + for name in expected_names: + if name in context or all(part in context for part in name.split()): + detected_names.add(name) + + return detected_names + +def test_gliner_truncates_entities_beyond_384_tokens(mock_image_with_late_entities): + """Test that GLiNER detects early names but misses names beyond 384 token limit.""" + # Setup analyzer with only GLiNER recognizer + registry = RecognizerRegistry() + registry.add_recognizer(GLiNERRecognizer()) + analyzer = AnalyzerEngine(registry=registry) + + # Analyze the image + image_analyzer = ImageAnalyzerEngine(analyzer_engine=analyzer) + results = image_analyzer.analyze(mock_image_with_late_entities) + + # Get OCR text for name extraction + ocr_text = image_analyzer.ocr.get_text_from_ocr_dict( + image_analyzer.ocr.perform_ocr(mock_image_with_late_entities) + ) + + # Extract detected names + expected_names = ["Amanda Williams", "James Patterson", "Jennifer Anderson", "Christopher Davis"] + detected_names = extract_detected_names(results, ocr_text, expected_names) + + for name in expected_names: + print(f" {'✅' if name in detected_names else '❌'} {name}") + + # Assert early names are detected + assert "Amanda Williams" in detected_names, "Early name should be detected" + assert "James Patterson" in detected_names, "Early name should be detected" + + # Assert late names are detected (will fail due to truncation) + assert "Jennifer Anderson" in detected_names, "Late name missed" + assert "Christopher Davis" in detected_names, "Late name missed" From b04d9c72de58b4b1391fd0901c502d414bf712e3 Mon Sep 17 00:00:00 2001 From: "AJ (Ashitosh Jedhe)" Date: Tue, 25 Nov 2025 13:55:12 +0000 Subject: [PATCH 02/14] Update gliner recognizer to implement basic chunking --- .../ner/gliner_recognizer.py | 124 +++++++++++++++++- .../tests/test_gliner_recognizer.py | 119 +++++++++++++++++ 2 files changed, 236 insertions(+), 7 deletions(-) diff --git a/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py b/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py index 13523f5c0..f17a13da6 100644 --- a/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py +++ b/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py @@ -35,6 +35,8 @@ def __init__( multi_label: bool = False, threshold: float = 0.30, map_location: str = "cpu", + chunk_size: int = 250, + chunk_overlap: int = 50, ): """GLiNER model based entity recognizer. @@ -54,6 +56,11 @@ def __init__( :param threshold: The threshold for the model's output (see GLiNER's documentation) :param map_location: The device to use for the model + :param chunk_size: Maximum character length for text chunks. + Text longer than this will be split into chunks to avoid token truncation. + Default is 250 characters, matching gliner-spacy implementation. + :param chunk_overlap: Number of characters to overlap between chunks. + Overlap helps detect entities at chunk boundaries. Default is 50 characters. """ @@ -86,6 +93,8 @@ def __init__( self.flat_ner = flat_ner self.multi_label = multi_label self.threshold = threshold + self.chunk_size = chunk_size + self.chunk_overlap = chunk_overlap self.gliner = None @@ -121,13 +130,40 @@ def analyze( # combine the input labels as this model allows for ad-hoc labels labels = self.__create_input_labels(entities) - predictions = self.gliner.predict_entities( - text=text, - labels=labels, - flat_ner=self.flat_ner, - threshold=self.threshold, - multi_label=self.multi_label, - ) + # For short text, process directly + if len(text) <= self.chunk_size: + predictions = self.gliner.predict_entities( + text=text, + labels=labels, + flat_ner=self.flat_ner, + threshold=self.threshold, + multi_label=self.multi_label, + ) + else: + # Chunk long text and process each chunk + chunks = self._chunk_text(text) + predictions = [] + offset = 0 + + for chunk in chunks: + chunk_predictions = self.gliner.predict_entities( + text=chunk, + labels=labels, + flat_ner=self.flat_ner, + threshold=self.threshold, + multi_label=self.multi_label, + ) + # Adjust offsets to match original text position + for pred in chunk_predictions: + pred["start"] += offset + pred["end"] += offset + + predictions.extend(chunk_predictions) + offset += len(chunk) - self.chunk_overlap + + # Remove duplicate entities from overlapping chunks + predictions = self._deduplicate_predictions(predictions) + recognizer_results = [] for prediction in predictions: presidio_entity = self.model_to_presidio_entity_mapping.get( @@ -154,6 +190,80 @@ def analyze( return recognizer_results + def _chunk_text(self, text: str) -> List[str]: + """Split text into overlapping chunks at word boundaries. + + Based on gliner-spacy chunking strategy with overlap to catch entities + at chunk boundaries: + https://github.com/theirstory/gliner-spacy/blob/main/gliner_spacy/pipeline.py#L60-L96 + + :param text: The full text to chunk + :return: List of overlapping text chunks + """ + chunks = [] + start = 0 + + while start < len(text): + # Calculate end position + end = ( + start + self.chunk_size if start + self.chunk_size < len(text) else len(text) + ) + + # Ensure the chunk ends at a complete word + while end < len(text) and text[end] not in [" ", "\n"]: + end += 1 + + chunks.append(text[start:end]) + + # Move start position with overlap (stop if we've covered all text) + if end >= len(text): + break + start = end - self.chunk_overlap + + return chunks + + def _deduplicate_predictions(self, predictions: List[Dict]) -> List[Dict]: + """Remove duplicate entities from overlapping chunks. + + Two entities are considered duplicates if they overlap significantly. + Keeps the entity with the highest score. + + :param predictions: List of entity predictions with start, end, label, score + :return: Deduplicated list of predictions + """ + if not predictions: + return predictions + + # Sort by score descending to keep highest scoring entities + sorted_preds = sorted(predictions, key=lambda p: p["score"], reverse=True) + unique = [] + + for pred in sorted_preds: + # Check if this prediction overlaps significantly with any kept prediction + is_duplicate = False + for kept in unique: + # Check if same entity type and overlapping positions + if pred["label"] == kept["label"]: + overlap_start = max(pred["start"], kept["start"]) + overlap_end = min(pred["end"], kept["end"]) + + if overlap_start < overlap_end: + # Calculate overlap ratio + overlap_len = overlap_end - overlap_start + pred_len = pred["end"] - pred["start"] + kept_len = kept["end"] - kept["start"] + + # If >50% overlap, consider duplicate + if overlap_len / min(pred_len, kept_len) > 0.5: + is_duplicate = True + break + + if not is_duplicate: + unique.append(pred) + + # Sort by position for consistent output + return sorted(unique, key=lambda p: p["start"]) + def __create_input_labels(self, entities): """Append the entities requested by the user to the list of labels if it's not there.""" # noqa: E501 labels = self.gliner_labels diff --git a/presidio-analyzer/tests/test_gliner_recognizer.py b/presidio-analyzer/tests/test_gliner_recognizer.py index b78f731f7..916ecd91f 100644 --- a/presidio-analyzer/tests/test_gliner_recognizer.py +++ b/presidio-analyzer/tests/test_gliner_recognizer.py @@ -131,3 +131,122 @@ def test_analyze_with_no_entities(mock_gliner): # Should return no results assert len(results) == 0 + + +def test_gliner_handles_long_text_with_chunking(mock_gliner): + """Test that GLiNER chunks long text and adjusts entity offsets correctly.""" + if sys.version_info < (3, 10): + pytest.skip("gliner requires Python >= 3.10") + + text = "John Smith lives here. " + ("x " * 120) + "Jane Doe works there." + + # Mock returns entities with positions relative to each chunk + def mock_predict_entities(text, labels, flat_ner, threshold, multi_label): + entities = [] + if "John Smith" in text: + start = text.find("John Smith") + entities.append({"label": "person", "start": start, "end": start + 10, "score": 0.95}) + if "Jane Doe" in text: + start = text.find("Jane Doe") + entities.append({"label": "person", "start": start, "end": start + 8, "score": 0.93}) + return entities + + mock_gliner.predict_entities.side_effect = mock_predict_entities + + gliner_recognizer = GLiNERRecognizer( + entity_mapping={"person": "PERSON"}, + chunk_size=250, + ) + gliner_recognizer.gliner = mock_gliner + + results = gliner_recognizer.analyze(text, ["PERSON"]) + + # Verify chunking occurred (predict_entities called multiple times) + assert mock_gliner.predict_entities.call_count == 2, f"Expected 2 chunks, got {mock_gliner.predict_entities.call_count}" + + # Verify exactly 2 entities were detected + assert len(results) == 2, f"Expected 2 entities, found {len(results)}" + + # Verify both entities have correct offsets in original text + assert text[results[0].start:results[0].end] == "John Smith" + assert results[0].entity_type == "PERSON" + assert results[0].score == 0.95 + + assert text[results[1].start:results[1].end] == "Jane Doe" + assert results[1].entity_type == "PERSON" + assert results[1].score == 0.93 + + +def test_gliner_detects_entity_split_across_chunk_boundary(mock_gliner): + """Test that overlap catches entities split at chunk boundaries.""" + if sys.version_info < (3, 10): + pytest.skip("gliner requires Python >= 3.10") + + # Entity "Amanda Williams" will be split: "Amanda" at end of chunk 1, "Williams" at start of chunk 2 + # With 50-char overlap, both parts should be in the overlapping region + text = ("x " * 100) + "Amanda Williams" + (" x" * 100) + + def mock_predict_entities(text, labels, flat_ner, threshold, multi_label): + entities = [] + if "Amanda Williams" in text: + start = text.find("Amanda Williams") + entities.append({"label": "person", "start": start, "end": start + 15, "score": 0.92}) + return entities + + mock_gliner.predict_entities.side_effect = mock_predict_entities + + gliner_recognizer = GLiNERRecognizer( + entity_mapping={"person": "PERSON"}, + chunk_size=250, + chunk_overlap=50, + ) + gliner_recognizer.gliner = mock_gliner + + results = gliner_recognizer.analyze(text, ["PERSON"]) + + # Verify entity at boundary was detected + assert len(results) == 1, f"Expected 1 entity, found {len(results)}" + assert text[results[0].start:results[0].end] == "Amanda Williams" + assert results[0].entity_type == "PERSON" + + +def test_gliner_deduplicates_entities_in_overlap_region(mock_gliner): + """Test that duplicate entities from overlapping chunks are removed.""" + if sys.version_info < (3, 10): + pytest.skip("gliner requires Python >= 3.10") + + # Create text where entity appears in overlap region of both chunks + text = ("x " * 95) + "Dr. Smith" + (" x" * 100) + + call_count = 0 + def mock_predict_entities(text, labels, flat_ner, threshold, multi_label): + nonlocal call_count + call_count += 1 + entities = [] + if "Dr. Smith" in text: + start = text.find("Dr. Smith") + # Return slightly different scores to test that highest is kept + score = 0.95 if call_count == 1 else 0.90 + entities.append({"label": "person", "start": start, "end": start + 9, "score": score}) + return entities + + mock_gliner.predict_entities.side_effect = mock_predict_entities + + gliner_recognizer = GLiNERRecognizer( + entity_mapping={"person": "PERSON"}, + chunk_size=250, + chunk_overlap=50, + ) + gliner_recognizer.gliner = mock_gliner + + results = gliner_recognizer.analyze(text, ["PERSON"]) + + # Verify: Called multiple times due to overlap + assert mock_gliner.predict_entities.call_count >= 2, "Should process multiple chunks" + + # Verify: Only 1 result after deduplication (not 2) + assert len(results) == 1, f"Expected 1 deduplicated entity, found {len(results)}" + + # Verify: Kept the one with highest score (0.95 from first chunk) + assert results[0].score == 0.95 + assert text[results[0].start:results[0].end] == "Dr. Smith" From e0eb74578103bc7c6ee308678be17a6fcf8e7bfe Mon Sep 17 00:00:00 2001 From: "AJ (Ashitosh Jedhe)" Date: Tue, 25 Nov 2025 16:32:15 +0000 Subject: [PATCH 03/14] Add changes for chunking capabilities including local chuking and call to chunking from gliner recognizer --- .../presidio_analyzer/chunkers/__init__.py | 16 ++ .../chunkers/base_chunker.py | 16 ++ .../chunkers/chunking_utils.py | 103 ++++++++++++ .../chunkers/local_text_chunker.py | 61 +++++++ .../ner/gliner_recognizer.py | 141 ++++------------ .../tests/test_chunking_utils.py | 155 ++++++++++++++++++ presidio-analyzer/tests/test_text_chunker.py | 146 +++++++++++++++++ 7 files changed, 526 insertions(+), 112 deletions(-) create mode 100644 presidio-analyzer/presidio_analyzer/chunkers/__init__.py create mode 100644 presidio-analyzer/presidio_analyzer/chunkers/base_chunker.py create mode 100644 presidio-analyzer/presidio_analyzer/chunkers/chunking_utils.py create mode 100644 presidio-analyzer/presidio_analyzer/chunkers/local_text_chunker.py create mode 100644 presidio-analyzer/tests/test_chunking_utils.py create mode 100644 presidio-analyzer/tests/test_text_chunker.py diff --git a/presidio-analyzer/presidio_analyzer/chunkers/__init__.py b/presidio-analyzer/presidio_analyzer/chunkers/__init__.py new file mode 100644 index 000000000..41379f784 --- /dev/null +++ b/presidio-analyzer/presidio_analyzer/chunkers/__init__.py @@ -0,0 +1,16 @@ +"""Text chunking strategies for handling long texts.""" +from presidio_analyzer.chunkers.base_chunker import BaseTextChunker +from presidio_analyzer.chunkers.local_text_chunker import LocalTextChunker +from presidio_analyzer.chunkers.chunking_utils import ( + predict_with_chunking, + process_text_in_chunks, + deduplicate_overlapping_entities, +) + +__all__ = [ + "BaseTextChunker", + "LocalTextChunker", + "predict_with_chunking", + "process_text_in_chunks", + "deduplicate_overlapping_entities", +] diff --git a/presidio-analyzer/presidio_analyzer/chunkers/base_chunker.py b/presidio-analyzer/presidio_analyzer/chunkers/base_chunker.py new file mode 100644 index 000000000..a85ae2765 --- /dev/null +++ b/presidio-analyzer/presidio_analyzer/chunkers/base_chunker.py @@ -0,0 +1,16 @@ +"""Abstract base class for text chunking strategies.""" +from abc import ABC, abstractmethod +from typing import List + + +class BaseTextChunker(ABC): + """Abstract base class for text chunking strategies.""" + + @abstractmethod + def chunk(self, text: str) -> List[str]: + """Split text into chunks. + + :param text: The input text to split + :return: List of text chunks + """ + pass diff --git a/presidio-analyzer/presidio_analyzer/chunkers/chunking_utils.py b/presidio-analyzer/presidio_analyzer/chunkers/chunking_utils.py new file mode 100644 index 000000000..ed4492396 --- /dev/null +++ b/presidio-analyzer/presidio_analyzer/chunkers/chunking_utils.py @@ -0,0 +1,103 @@ +"""Utility functions for processing text with chunking strategies.""" +from typing import Any, Callable, Dict, List + +from presidio_analyzer.chunkers.base_chunker import BaseTextChunker + + +def predict_with_chunking( + text: str, + predict_func: Callable[[str], List[Dict[str, Any]]], + chunker: BaseTextChunker, +) -> List[Dict[str, Any]]: + """Process text with automatic chunking for long texts. + + For short text (≤ chunker.chunk_size), calls predict_func directly. + For long text, chunks it and merges predictions with deduplication. + + :param text: Input text to process + :param predict_func: Function that takes text and returns predictions + :param chunker: Text chunking strategy (contains chunk_size and chunk_overlap) + :return: List of predictions with correct offsets + """ + if len(text) <= chunker.chunk_size: + return predict_func(text) + + predictions = process_text_in_chunks( + text=text, + chunker=chunker, + process_func=predict_func, + chunk_overlap=chunker.chunk_overlap, + ) + return deduplicate_overlapping_entities(predictions) + +def process_text_in_chunks( + text: str, + chunker: BaseTextChunker, + process_func: Callable[[str], List[Dict[str, Any]]], + chunk_overlap: int, +) -> List[Dict[str, Any]]: + """Process text in chunks and adjust entity offsets. + + :param text: Input text to process + :param chunker: Text chunking strategy + :param process_func: Function that takes chunk text and returns predictions + :param chunk_overlap: Number of characters overlapping between chunks + :return: List of predictions with adjusted offsets + """ + chunks = chunker.chunk(text) + all_predictions = [] + offset = 0 + + for chunk in chunks: + chunk_predictions = process_func(chunk) + + # Adjust offsets to match original text position + for pred in chunk_predictions: + pred["start"] += offset + pred["end"] += offset + + all_predictions.extend(chunk_predictions) + offset += len(chunk) - chunk_overlap + + return all_predictions + +def deduplicate_overlapping_entities( + predictions: List[Dict[str, Any]], overlap_threshold: float = 0.5 +) -> List[Dict[str, Any]]: + """Remove duplicate entities from overlapping chunks. + + :param predictions: List of predictions with 'start', 'end', 'label', 'score' + :param overlap_threshold: Overlap ratio threshold to consider duplicates (default: 0.5) + :return: Deduplicated list of predictions sorted by position + """ + if not predictions: + return predictions + + # Sort by score descending to keep highest scoring entities + sorted_preds = sorted(predictions, key=lambda p: p["score"], reverse=True) + unique = [] + + for pred in sorted_preds: + is_duplicate = False + for kept in unique: + # Check if same entity type and overlapping positions + if pred["label"] == kept["label"]: + overlap_start = max(pred["start"], kept["start"]) + overlap_end = min(pred["end"], kept["end"]) + + if overlap_start < overlap_end: + # Calculate overlap ratio + overlap_len = overlap_end - overlap_start + pred_len = pred["end"] - pred["start"] + kept_len = kept["end"] - kept["start"] + + # Check if overlap exceeds threshold + if overlap_len / min(pred_len, kept_len) > overlap_threshold: + is_duplicate = True + break + + if not is_duplicate: + unique.append(pred) + + # Sort by position for consistent output + return sorted(unique, key=lambda p: p["start"]) diff --git a/presidio-analyzer/presidio_analyzer/chunkers/local_text_chunker.py b/presidio-analyzer/presidio_analyzer/chunkers/local_text_chunker.py new file mode 100644 index 000000000..4e3eafb18 --- /dev/null +++ b/presidio-analyzer/presidio_analyzer/chunkers/local_text_chunker.py @@ -0,0 +1,61 @@ +"""Character-based text chunker with word boundary preservation. + +Based on gliner-spacy implementation: +https://github.com/theirstory/gliner-spacy/blob/main/gliner_spacy/pipeline.py#L60-L96 +""" +from typing import List + +from presidio_analyzer.chunkers.base_chunker import BaseTextChunker + + +class LocalTextChunker(BaseTextChunker): + """Character-based text chunker with word boundary preservation.""" + + def __init__(self, chunk_size: int, chunk_overlap: int = 0): + """Initialize the local text chunker. + + :param chunk_size: Maximum characters per chunk (must be > 0) + :param chunk_overlap: Characters to overlap between chunks (must be >= 0 and < chunk_size) + """ + if chunk_size <= 0: + raise ValueError("chunk_size must be greater than 0") + if chunk_overlap < 0 or chunk_overlap >= chunk_size: + raise ValueError( + "chunk_overlap must be non-negative and less than chunk_size" + ) + + self.chunk_size = chunk_size + self.chunk_overlap = chunk_overlap + + def chunk(self, text: str) -> List[str]: + """Split text into overlapping chunks at word boundaries. + + :param text: The input text to chunk + :return: List of text chunks with overlap + """ + if not text: + return [] + + chunks = [] + start = 0 + + while start < len(text): + # Calculate end position + end = ( + start + self.chunk_size + if start + self.chunk_size < len(text) + else len(text) + ) + + # Extend to complete word boundary (space or newline) + while end < len(text) and text[end] not in [" ", "\n"]: + end += 1 + + chunks.append(text[start:end]) + + # Move start position with overlap (stop if we've covered all text) + if end >= len(text): + break + start = end - self.chunk_overlap + + return chunks diff --git a/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py b/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py index f17a13da6..e2ea82e67 100644 --- a/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py +++ b/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py @@ -8,6 +8,11 @@ RecognizerResult, ) from presidio_analyzer.nlp_engine import NerModelConfiguration, NlpArtifacts +from presidio_analyzer.chunkers import ( + BaseTextChunker, + LocalTextChunker, + predict_with_chunking, +) try: from gliner import GLiNER, GLiNERConfig @@ -37,6 +42,7 @@ def __init__( map_location: str = "cpu", chunk_size: int = 250, chunk_overlap: int = 50, + text_chunker: Optional[BaseTextChunker] = None, ): """GLiNER model based entity recognizer. @@ -56,11 +62,9 @@ def __init__( :param threshold: The threshold for the model's output (see GLiNER's documentation) :param map_location: The device to use for the model - :param chunk_size: Maximum character length for text chunks. - Text longer than this will be split into chunks to avoid token truncation. - Default is 250 characters, matching gliner-spacy implementation. - :param chunk_overlap: Number of characters to overlap between chunks. - Overlap helps detect entities at chunk boundaries. Default is 50 characters. + :param chunk_size: Maximum character length for text chunks (default: 250) + :param chunk_overlap: Characters to overlap between chunks (default: 50) + :param text_chunker: Custom text chunking strategy. If None, uses LocalTextChunker """ @@ -96,6 +100,13 @@ def __init__( self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap + # Use provided chunker or default to LocalTextChunker + self.text_chunker = ( + text_chunker + if text_chunker is not None + else LocalTextChunker(chunk_size, chunk_overlap) + ) + self.gliner = None super().__init__( @@ -130,39 +141,19 @@ def analyze( # combine the input labels as this model allows for ad-hoc labels labels = self.__create_input_labels(entities) - # For short text, process directly - if len(text) <= self.chunk_size: - predictions = self.gliner.predict_entities( - text=text, - labels=labels, - flat_ner=self.flat_ner, - threshold=self.threshold, - multi_label=self.multi_label, - ) - else: - # Chunk long text and process each chunk - chunks = self._chunk_text(text) - predictions = [] - offset = 0 - - for chunk in chunks: - chunk_predictions = self.gliner.predict_entities( - text=chunk, - labels=labels, - flat_ner=self.flat_ner, - threshold=self.threshold, - multi_label=self.multi_label, - ) - # Adjust offsets to match original text position - for pred in chunk_predictions: - pred["start"] += offset - pred["end"] += offset - - predictions.extend(chunk_predictions) - offset += len(chunk) - self.chunk_overlap - - # Remove duplicate entities from overlapping chunks - predictions = self._deduplicate_predictions(predictions) + # Process text with automatic chunking + predict_func = lambda text: self.gliner.predict_entities( + text=text, + labels=labels, + flat_ner=self.flat_ner, + threshold=self.threshold, + multi_label=self.multi_label, + ) + predictions = predict_with_chunking( + text=text, + predict_func=predict_func, + chunker=self.text_chunker, + ) recognizer_results = [] for prediction in predictions: @@ -190,80 +181,6 @@ def analyze( return recognizer_results - def _chunk_text(self, text: str) -> List[str]: - """Split text into overlapping chunks at word boundaries. - - Based on gliner-spacy chunking strategy with overlap to catch entities - at chunk boundaries: - https://github.com/theirstory/gliner-spacy/blob/main/gliner_spacy/pipeline.py#L60-L96 - - :param text: The full text to chunk - :return: List of overlapping text chunks - """ - chunks = [] - start = 0 - - while start < len(text): - # Calculate end position - end = ( - start + self.chunk_size if start + self.chunk_size < len(text) else len(text) - ) - - # Ensure the chunk ends at a complete word - while end < len(text) and text[end] not in [" ", "\n"]: - end += 1 - - chunks.append(text[start:end]) - - # Move start position with overlap (stop if we've covered all text) - if end >= len(text): - break - start = end - self.chunk_overlap - - return chunks - - def _deduplicate_predictions(self, predictions: List[Dict]) -> List[Dict]: - """Remove duplicate entities from overlapping chunks. - - Two entities are considered duplicates if they overlap significantly. - Keeps the entity with the highest score. - - :param predictions: List of entity predictions with start, end, label, score - :return: Deduplicated list of predictions - """ - if not predictions: - return predictions - - # Sort by score descending to keep highest scoring entities - sorted_preds = sorted(predictions, key=lambda p: p["score"], reverse=True) - unique = [] - - for pred in sorted_preds: - # Check if this prediction overlaps significantly with any kept prediction - is_duplicate = False - for kept in unique: - # Check if same entity type and overlapping positions - if pred["label"] == kept["label"]: - overlap_start = max(pred["start"], kept["start"]) - overlap_end = min(pred["end"], kept["end"]) - - if overlap_start < overlap_end: - # Calculate overlap ratio - overlap_len = overlap_end - overlap_start - pred_len = pred["end"] - pred["start"] - kept_len = kept["end"] - kept["start"] - - # If >50% overlap, consider duplicate - if overlap_len / min(pred_len, kept_len) > 0.5: - is_duplicate = True - break - - if not is_duplicate: - unique.append(pred) - - # Sort by position for consistent output - return sorted(unique, key=lambda p: p["start"]) - def __create_input_labels(self, entities): """Append the entities requested by the user to the list of labels if it's not there.""" # noqa: E501 labels = self.gliner_labels diff --git a/presidio-analyzer/tests/test_chunking_utils.py b/presidio-analyzer/tests/test_chunking_utils.py new file mode 100644 index 000000000..99a1b99a3 --- /dev/null +++ b/presidio-analyzer/tests/test_chunking_utils.py @@ -0,0 +1,155 @@ +"""Tests for chunking utility functions.""" +import pytest + +from presidio_analyzer.chunkers import ( + LocalTextChunker, + process_text_in_chunks, + deduplicate_overlapping_entities, +) + + +class TestProcessTextInChunks: + """Test process_text_in_chunks utility function.""" + + def test_short_text_no_chunking(self): + """Test text shorter than chunk size is not chunked.""" + chunker = LocalTextChunker(chunk_size=100, chunk_overlap=20) + text = "Short text" + predict_func = lambda chunk: [{"start": 0, "end": 5, "label": "PERSON", "score": 0.9}] + + result = process_text_in_chunks(text, chunker, predict_func, chunk_overlap=20) + + assert len(result) == 1 + assert result[0]["start"] == 0 + assert result[0]["end"] == 5 + + def test_long_text_with_offset_adjustment(self): + """Test offset adjustment for chunked text.""" + chunker = LocalTextChunker(chunk_size=20, chunk_overlap=5) + text = "John Smith lives in New York City with Jane Doe" + + # Mock predict function that finds entities in each chunk + def predict_func(chunk): + if "John" in chunk: + return [{"start": 0, "end": 10, "label": "PERSON", "score": 0.9}] + elif "Jane" in chunk: + idx = chunk.index("Jane") + return [{"start": idx, "end": idx + 8, "label": "PERSON", "score": 0.85}] + return [] + + result = process_text_in_chunks(text, chunker, predict_func, chunk_overlap=5) + + # First entity should be at original position + assert result[0]["start"] == 0 + assert result[0]["end"] == 10 + # Second entity should have adjusted offset + assert result[1]["start"] > 20 # In second chunk + + def test_empty_predictions(self): + """Test handling of no predictions.""" + chunker = LocalTextChunker(chunk_size=50, chunk_overlap=10) + text = "Some text without entities" + predict_func = lambda chunk: [] + + result = process_text_in_chunks(text, chunker, predict_func, chunk_overlap=10) + + assert result == [] + + +class TestDeduplicateOverlappingEntities: + """Test deduplicate_overlapping_entities utility function.""" + + def test_no_duplicates(self): + """Test predictions with no overlap.""" + predictions = [ + {"start": 0, "end": 10, "label": "PERSON", "score": 0.9}, + {"start": 20, "end": 30, "label": "PERSON", "score": 0.85}, + ] + + result = deduplicate_overlapping_entities(predictions) + + assert len(result) == 2 + assert result[0]["start"] == 0 + assert result[1]["start"] == 20 + + def test_exact_duplicates_keeps_highest_score(self): + """Test exact duplicates keeps highest scoring entity.""" + predictions = [ + {"start": 0, "end": 10, "label": "PERSON", "score": 0.9}, + {"start": 0, "end": 10, "label": "PERSON", "score": 0.85}, + ] + + result = deduplicate_overlapping_entities(predictions) + + assert len(result) == 1 + assert result[0]["score"] == 0.9 + + def test_overlapping_duplicates(self): + """Test overlapping entities are deduplicated.""" + predictions = [ + {"start": 0, "end": 10, "label": "PERSON", "score": 0.9}, + {"start": 3, "end": 13, "label": "PERSON", "score": 0.85}, + ] + + result = deduplicate_overlapping_entities(predictions) + + # Overlap is 7 chars, ratio = 0.7 > 0.5 threshold + assert len(result) == 1 + assert result[0]["score"] == 0.9 + + def test_different_labels_not_deduplicated(self): + """Test overlapping entities with different labels are kept.""" + predictions = [ + {"start": 0, "end": 10, "label": "PERSON", "score": 0.9}, + {"start": 5, "end": 15, "label": "LOCATION", "score": 0.85}, + ] + + result = deduplicate_overlapping_entities(predictions) + + assert len(result) == 2 + + def test_low_overlap_not_deduplicated(self): + """Test entities with low overlap are not deduplicated.""" + predictions = [ + {"start": 0, "end": 10, "label": "PERSON", "score": 0.9}, + {"start": 9, "end": 20, "label": "PERSON", "score": 0.85}, + ] + + result = deduplicate_overlapping_entities(predictions, overlap_threshold=0.6) + + # Overlap is only 1 char out of 10, ratio = 0.1, below threshold + assert len(result) == 2 + + def test_empty_predictions(self): + """Test empty predictions list.""" + result = deduplicate_overlapping_entities([]) + assert result == [] + + def test_sorted_by_position(self): + """Test results are sorted by start position.""" + predictions = [ + {"start": 20, "end": 30, "label": "PERSON", "score": 0.9}, + {"start": 0, "end": 10, "label": "PERSON", "score": 0.85}, + {"start": 40, "end": 50, "label": "PERSON", "score": 0.95}, + ] + + result = deduplicate_overlapping_entities(predictions) + + assert result[0]["start"] == 0 + assert result[1]["start"] == 20 + assert result[2]["start"] == 40 + + def test_custom_overlap_threshold(self): + """Test custom overlap threshold.""" + predictions = [ + {"start": 0, "end": 10, "label": "PERSON", "score": 0.9}, + {"start": 5, "end": 15, "label": "PERSON", "score": 0.85}, + ] + + # With 0.3 threshold, should deduplicate (overlap ratio = 0.5) + result = deduplicate_overlapping_entities(predictions, overlap_threshold=0.3) + assert len(result) == 1 + + # With 0.7 threshold, should keep both (overlap ratio = 0.5 < 0.7) + result = deduplicate_overlapping_entities(predictions, overlap_threshold=0.7) + assert len(result) == 2 diff --git a/presidio-analyzer/tests/test_text_chunker.py b/presidio-analyzer/tests/test_text_chunker.py new file mode 100644 index 000000000..4bdd270f0 --- /dev/null +++ b/presidio-analyzer/tests/test_text_chunker.py @@ -0,0 +1,146 @@ +"""Tests for text chunking strategies.""" +import pytest + +from presidio_analyzer.chunkers import LocalTextChunker + + +class TestLocalTextChunker: + """Test LocalTextChunker implementation.""" + + def test_empty_text(self): + """Test chunking empty text.""" + chunker = LocalTextChunker(chunk_size=100, chunk_overlap=20) + result = chunker.chunk("") + assert result == [] + + def test_short_text(self): + """Test text shorter than chunk_size.""" + chunker = LocalTextChunker(chunk_size=100, chunk_overlap=20) + text = "This is a short text." + result = chunker.chunk(text) + assert len(result) == 1 + assert result[0] == text + + def test_long_text_without_overlap(self): + """Test long text with no overlap.""" + chunker = LocalTextChunker(chunk_size=3, chunk_overlap=0) + text = "1 2 3 4" # 7 chars + result = chunker.chunk(text) + # Actual behavior: word boundaries extend chunks: ["1 2", " 3 4"] + assert len(result) == 2 + assert result[0] == "1 2" + assert result[1] == " 3 4" + + def test_long_text_with_overlap(self): + """Test long text with overlap.""" + chunker = LocalTextChunker(chunk_size=5, chunk_overlap=2) + text = "1 3 5 7 9" # 9 chars: positions 0-8 + result = chunker.chunk(text) + + assert len(result) == 2 + assert result[0] == "1 3 5" + assert result[1] == " 5 7 9" + # Verify overlap exists + assert result[0].endswith(" 5") and result[1].startswith(" 5") + + def test_word_boundary_preservation(self): + """Test that chunks extend to word boundaries.""" + chunker = LocalTextChunker(chunk_size=8, chunk_overlap=2) + text = "one two three four" + result = chunker.chunk(text) + # Chunks extend to word boundaries: "one two three" (13 chars) instead of breaking at 8 + assert result[0] == "one two three" + assert len(result) == 2 + + def test_exact_chunk_size(self): + """Test text that's exactly chunk_size.""" + chunker = LocalTextChunker(chunk_size=5, chunk_overlap=2) + text = "1 2 3" + result = chunker.chunk(text) + assert len(result) == 1 + assert result[0] == text + + def test_validation_zero_chunk_size(self): + """Test that chunk_size must be > 0.""" + with pytest.raises(ValueError, match="chunk_size must be greater than 0"): + LocalTextChunker(chunk_size=0, chunk_overlap=5) + + def test_validation_negative_chunk_size(self): + """Test that chunk_size cannot be negative.""" + with pytest.raises(ValueError, match="chunk_size must be greater than 0"): + LocalTextChunker(chunk_size=-10, chunk_overlap=5) + + def test_validation_negative_overlap(self): + """Test that overlap cannot be negative.""" + with pytest.raises( + ValueError, match="chunk_overlap must be non-negative and less than chunk_size" + ): + LocalTextChunker(chunk_size=100, chunk_overlap=-5) + + def test_validation_overlap_equals_chunk_size(self): + """Test that overlap cannot equal chunk_size.""" + with pytest.raises( + ValueError, match="chunk_overlap must be non-negative and less than chunk_size" + ): + LocalTextChunker(chunk_size=100, chunk_overlap=100) + + def test_validation_overlap_greater_than_chunk_size(self): + """Test that overlap cannot exceed chunk_size.""" + with pytest.raises( + ValueError, match="chunk_overlap must be non-negative and less than chunk_size" + ): + LocalTextChunker(chunk_size=50, chunk_overlap=75) + + def test_multiple_chunks_coverage(self): + """Test that chunks cover entire text.""" + chunker = LocalTextChunker(chunk_size=5, chunk_overlap=1) + text = "1 2 3 4 5 6" # 11 chars + result = chunker.chunk(text) + # Verify all numbers appear in at least one chunk + all_text = "".join(result) + assert all(num in all_text for num in ["1", "2", "3", "4", "5", "6"]) + + def test_newline_handling(self): + """Test that newlines are preserved and treated as word boundaries.""" + chunker = LocalTextChunker(chunk_size=10, chunk_overlap=0) + text = "line1\nline2\nline3" + result = chunker.chunk(text) + # Newlines should be preserved in output + combined = "".join(result) + assert combined == text + # Verify newlines exist in chunks + assert any("\n" in chunk for chunk in result) + + def test_default_parameters(self): + """Test chunker with default overlap (0).""" + chunker = LocalTextChunker(chunk_size=5) # No overlap specified + text = "1 2 3 4" + result = chunker.chunk(text) + assert len(result) == 2 + + def test_very_long_text(self): + """Test chunking very long text.""" + chunker = LocalTextChunker(chunk_size=10, chunk_overlap=2) + text = " ".join([str(i) for i in range(50)]) # "0 1 2 3..." + result = chunker.chunk(text) + # Should create many chunks + assert len(result) > 5 + # Verify chunks are reasonable size + for chunk in result: + assert len(chunk) <= 15 + + def test_real_world_example(self): + """Test with real-world PII detection scenario.""" + chunker = LocalTextChunker(chunk_size=250, chunk_overlap=50) + text = """John Smith's credit card number is 4532-1234-5678-9010. + His social security number is 123-45-6789 and his email is john.smith@example.com. + He lives at 123 Main Street, Anytown, ST 12345. + For contact, his phone number is (555) 123-4567.""" + result = chunker.chunk(text) + # Should be 1-2 chunks depending on exact length + assert 1 <= len(result) <= 2 + # All PII should be present in at least one chunk + all_text = " ".join(result) + assert "4532-1234-5678-9010" in all_text + assert "123-45-6789" in all_text + assert "john.smith@example.com" in all_text From 71fb61197f6396679d1bfbb9c30c2be8ba96050d Mon Sep 17 00:00:00 2001 From: "AJ (Ashitosh Jedhe)" Date: Wed, 26 Nov 2025 11:14:38 +0000 Subject: [PATCH 04/14] Remove gliner image redaction test - not required --- .../tests/test_gliner_token_truncation.py | 84 ------------------- 1 file changed, 84 deletions(-) delete mode 100644 presidio-image-redactor/tests/test_gliner_token_truncation.py diff --git a/presidio-image-redactor/tests/test_gliner_token_truncation.py b/presidio-image-redactor/tests/test_gliner_token_truncation.py deleted file mode 100644 index 8dbf9adc9..000000000 --- a/presidio-image-redactor/tests/test_gliner_token_truncation.py +++ /dev/null @@ -1,84 +0,0 @@ -"""Test GLiNER token truncation behavior with entities beyond 384 token limit.""" -import pytest -from PIL import Image, ImageDraw, ImageFont -from presidio_analyzer import AnalyzerEngine, RecognizerRegistry -from presidio_image_redactor import ImageAnalyzerEngine -from presidio_analyzer.predefined_recognizers import GLiNERRecognizer - - - -@pytest.fixture(scope="module") -def mock_image_with_late_entities(): - """Create a test image with person names before and after the 384 token limit.""" - img = Image.new('RGB', (1000, 1400), color='white') - draw = ImageDraw.Draw(img) - font = ImageFont.load_default() - - text_lines = [ - "Patient: Dr. Amanda Williams", - "Doctor: Dr. James Patterson", - "", - ] - - # Add filler text to exceed 384 token limit - text_lines.extend([ - f"Par{i}: Medical documentation regarding treatment protocols. " - f"The facility maintains comprehensive records of consultations. " - f"Standard procedures require detailed documentation." - for i in range(60) - ]) - - # Add names beyond token limit - text_lines.extend(["", "Nurse: Jennifer Anderson", "Therapist: Christopher Davis"]) - - # Draw text on image - y = 30 - for line in text_lines: - draw.text((30, y), line, fill='black', font=font) - y += 20 - - return img - -def extract_detected_names(results, ocr_text, expected_names): - """Extract which expected names were detected from analyzer results.""" - detected_names = set() - - for result in results: - if result.start < len(ocr_text) and result.end <= len(ocr_text): - context = ocr_text[max(0, result.start - 20):min(len(ocr_text), result.end + 20)] - for name in expected_names: - if name in context or all(part in context for part in name.split()): - detected_names.add(name) - - return detected_names - -def test_gliner_truncates_entities_beyond_384_tokens(mock_image_with_late_entities): - """Test that GLiNER detects early names but misses names beyond 384 token limit.""" - # Setup analyzer with only GLiNER recognizer - registry = RecognizerRegistry() - registry.add_recognizer(GLiNERRecognizer()) - analyzer = AnalyzerEngine(registry=registry) - - # Analyze the image - image_analyzer = ImageAnalyzerEngine(analyzer_engine=analyzer) - results = image_analyzer.analyze(mock_image_with_late_entities) - - # Get OCR text for name extraction - ocr_text = image_analyzer.ocr.get_text_from_ocr_dict( - image_analyzer.ocr.perform_ocr(mock_image_with_late_entities) - ) - - # Extract detected names - expected_names = ["Amanda Williams", "James Patterson", "Jennifer Anderson", "Christopher Davis"] - detected_names = extract_detected_names(results, ocr_text, expected_names) - - for name in expected_names: - print(f" {'✅' if name in detected_names else '❌'} {name}") - - # Assert early names are detected - assert "Amanda Williams" in detected_names, "Early name should be detected" - assert "James Patterson" in detected_names, "Early name should be detected" - - # Assert late names are detected (will fail due to truncation) - assert "Jennifer Anderson" in detected_names, "Late name missed" - assert "Christopher Davis" in detected_names, "Late name missed" From c986737990dc720d4c6fd18dcb359b3a4c006ce8 Mon Sep 17 00:00:00 2001 From: "AJ (Ashitosh Jedhe)" Date: Wed, 26 Nov 2025 14:33:22 +0000 Subject: [PATCH 05/14] Rename local text chunker to character based text chunker --- ...ker.py => character_based_text_chunker.py} | 2 +- .../ner/gliner_recognizer.py | 4 +-- ...y => test_character_based_text_chunker.py} | 34 +++++++++---------- .../tests/test_chunking_utils.py | 8 ++--- 4 files changed, 24 insertions(+), 24 deletions(-) rename presidio-analyzer/presidio_analyzer/chunkers/{local_text_chunker.py => character_based_text_chunker.py} (97%) rename presidio-analyzer/tests/{test_text_chunker.py => test_character_based_text_chunker.py} (79%) diff --git a/presidio-analyzer/presidio_analyzer/chunkers/local_text_chunker.py b/presidio-analyzer/presidio_analyzer/chunkers/character_based_text_chunker.py similarity index 97% rename from presidio-analyzer/presidio_analyzer/chunkers/local_text_chunker.py rename to presidio-analyzer/presidio_analyzer/chunkers/character_based_text_chunker.py index 4e3eafb18..f4392e7da 100644 --- a/presidio-analyzer/presidio_analyzer/chunkers/local_text_chunker.py +++ b/presidio-analyzer/presidio_analyzer/chunkers/character_based_text_chunker.py @@ -8,7 +8,7 @@ from presidio_analyzer.chunkers.base_chunker import BaseTextChunker -class LocalTextChunker(BaseTextChunker): +class CharacterBasedTextChunker(BaseTextChunker): """Character-based text chunker with word boundary preservation.""" def __init__(self, chunk_size: int, chunk_overlap: int = 0): diff --git a/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py b/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py index e2ea82e67..343767bb1 100644 --- a/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py +++ b/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py @@ -10,7 +10,7 @@ from presidio_analyzer.nlp_engine import NerModelConfiguration, NlpArtifacts from presidio_analyzer.chunkers import ( BaseTextChunker, - LocalTextChunker, + CharacterBasedTextChunker, predict_with_chunking, ) @@ -104,7 +104,7 @@ def __init__( self.text_chunker = ( text_chunker if text_chunker is not None - else LocalTextChunker(chunk_size, chunk_overlap) + else CharacterBasedTextChunker(chunk_size, chunk_overlap) ) self.gliner = None diff --git a/presidio-analyzer/tests/test_text_chunker.py b/presidio-analyzer/tests/test_character_based_text_chunker.py similarity index 79% rename from presidio-analyzer/tests/test_text_chunker.py rename to presidio-analyzer/tests/test_character_based_text_chunker.py index 4bdd270f0..46b258abb 100644 --- a/presidio-analyzer/tests/test_text_chunker.py +++ b/presidio-analyzer/tests/test_character_based_text_chunker.py @@ -1,7 +1,7 @@ """Tests for text chunking strategies.""" import pytest -from presidio_analyzer.chunkers import LocalTextChunker +from presidio_analyzer.chunkers import CharacterBasedTextChunker class TestLocalTextChunker: @@ -9,13 +9,13 @@ class TestLocalTextChunker: def test_empty_text(self): """Test chunking empty text.""" - chunker = LocalTextChunker(chunk_size=100, chunk_overlap=20) + chunker = CharacterBasedTextChunker(chunk_size=100, chunk_overlap=20) result = chunker.chunk("") assert result == [] def test_short_text(self): """Test text shorter than chunk_size.""" - chunker = LocalTextChunker(chunk_size=100, chunk_overlap=20) + chunker = CharacterBasedTextChunker(chunk_size=100, chunk_overlap=20) text = "This is a short text." result = chunker.chunk(text) assert len(result) == 1 @@ -23,7 +23,7 @@ def test_short_text(self): def test_long_text_without_overlap(self): """Test long text with no overlap.""" - chunker = LocalTextChunker(chunk_size=3, chunk_overlap=0) + chunker = CharacterBasedTextChunker(chunk_size=3, chunk_overlap=0) text = "1 2 3 4" # 7 chars result = chunker.chunk(text) # Actual behavior: word boundaries extend chunks: ["1 2", " 3 4"] @@ -33,7 +33,7 @@ def test_long_text_without_overlap(self): def test_long_text_with_overlap(self): """Test long text with overlap.""" - chunker = LocalTextChunker(chunk_size=5, chunk_overlap=2) + chunker = CharacterBasedTextChunker(chunk_size=5, chunk_overlap=2) text = "1 3 5 7 9" # 9 chars: positions 0-8 result = chunker.chunk(text) @@ -45,7 +45,7 @@ def test_long_text_with_overlap(self): def test_word_boundary_preservation(self): """Test that chunks extend to word boundaries.""" - chunker = LocalTextChunker(chunk_size=8, chunk_overlap=2) + chunker = CharacterBasedTextChunker(chunk_size=8, chunk_overlap=2) text = "one two three four" result = chunker.chunk(text) # Chunks extend to word boundaries: "one two three" (13 chars) instead of breaking at 8 @@ -54,7 +54,7 @@ def test_word_boundary_preservation(self): def test_exact_chunk_size(self): """Test text that's exactly chunk_size.""" - chunker = LocalTextChunker(chunk_size=5, chunk_overlap=2) + chunker = CharacterBasedTextChunker(chunk_size=5, chunk_overlap=2) text = "1 2 3" result = chunker.chunk(text) assert len(result) == 1 @@ -63,37 +63,37 @@ def test_exact_chunk_size(self): def test_validation_zero_chunk_size(self): """Test that chunk_size must be > 0.""" with pytest.raises(ValueError, match="chunk_size must be greater than 0"): - LocalTextChunker(chunk_size=0, chunk_overlap=5) + CharacterBasedTextChunker(chunk_size=0, chunk_overlap=5) def test_validation_negative_chunk_size(self): """Test that chunk_size cannot be negative.""" with pytest.raises(ValueError, match="chunk_size must be greater than 0"): - LocalTextChunker(chunk_size=-10, chunk_overlap=5) + CharacterBasedTextChunker(chunk_size=-10, chunk_overlap=5) def test_validation_negative_overlap(self): """Test that overlap cannot be negative.""" with pytest.raises( ValueError, match="chunk_overlap must be non-negative and less than chunk_size" ): - LocalTextChunker(chunk_size=100, chunk_overlap=-5) + CharacterBasedTextChunker(chunk_size=100, chunk_overlap=-5) def test_validation_overlap_equals_chunk_size(self): """Test that overlap cannot equal chunk_size.""" with pytest.raises( ValueError, match="chunk_overlap must be non-negative and less than chunk_size" ): - LocalTextChunker(chunk_size=100, chunk_overlap=100) + CharacterBasedTextChunker(chunk_size=100, chunk_overlap=100) def test_validation_overlap_greater_than_chunk_size(self): """Test that overlap cannot exceed chunk_size.""" with pytest.raises( ValueError, match="chunk_overlap must be non-negative and less than chunk_size" ): - LocalTextChunker(chunk_size=50, chunk_overlap=75) + CharacterBasedTextChunker(chunk_size=50, chunk_overlap=75) def test_multiple_chunks_coverage(self): """Test that chunks cover entire text.""" - chunker = LocalTextChunker(chunk_size=5, chunk_overlap=1) + chunker = CharacterBasedTextChunker(chunk_size=5, chunk_overlap=1) text = "1 2 3 4 5 6" # 11 chars result = chunker.chunk(text) # Verify all numbers appear in at least one chunk @@ -102,7 +102,7 @@ def test_multiple_chunks_coverage(self): def test_newline_handling(self): """Test that newlines are preserved and treated as word boundaries.""" - chunker = LocalTextChunker(chunk_size=10, chunk_overlap=0) + chunker = CharacterBasedTextChunker(chunk_size=10, chunk_overlap=0) text = "line1\nline2\nline3" result = chunker.chunk(text) # Newlines should be preserved in output @@ -113,14 +113,14 @@ def test_newline_handling(self): def test_default_parameters(self): """Test chunker with default overlap (0).""" - chunker = LocalTextChunker(chunk_size=5) # No overlap specified + chunker = CharacterBasedTextChunker(chunk_size=5) # No overlap specified text = "1 2 3 4" result = chunker.chunk(text) assert len(result) == 2 def test_very_long_text(self): """Test chunking very long text.""" - chunker = LocalTextChunker(chunk_size=10, chunk_overlap=2) + chunker = CharacterBasedTextChunker(chunk_size=10, chunk_overlap=2) text = " ".join([str(i) for i in range(50)]) # "0 1 2 3..." result = chunker.chunk(text) # Should create many chunks @@ -131,7 +131,7 @@ def test_very_long_text(self): def test_real_world_example(self): """Test with real-world PII detection scenario.""" - chunker = LocalTextChunker(chunk_size=250, chunk_overlap=50) + chunker = CharacterBasedTextChunker(chunk_size=250, chunk_overlap=50) text = """John Smith's credit card number is 4532-1234-5678-9010. His social security number is 123-45-6789 and his email is john.smith@example.com. He lives at 123 Main Street, Anytown, ST 12345. diff --git a/presidio-analyzer/tests/test_chunking_utils.py b/presidio-analyzer/tests/test_chunking_utils.py index 99a1b99a3..3b19129d4 100644 --- a/presidio-analyzer/tests/test_chunking_utils.py +++ b/presidio-analyzer/tests/test_chunking_utils.py @@ -2,7 +2,7 @@ import pytest from presidio_analyzer.chunkers import ( - LocalTextChunker, + CharacterBasedTextChunker, process_text_in_chunks, deduplicate_overlapping_entities, ) @@ -13,7 +13,7 @@ class TestProcessTextInChunks: def test_short_text_no_chunking(self): """Test text shorter than chunk size is not chunked.""" - chunker = LocalTextChunker(chunk_size=100, chunk_overlap=20) + chunker = CharacterBasedTextChunker(chunk_size=100, chunk_overlap=20) text = "Short text" predict_func = lambda chunk: [{"start": 0, "end": 5, "label": "PERSON", "score": 0.9}] @@ -25,7 +25,7 @@ def test_short_text_no_chunking(self): def test_long_text_with_offset_adjustment(self): """Test offset adjustment for chunked text.""" - chunker = LocalTextChunker(chunk_size=20, chunk_overlap=5) + chunker = CharacterBasedTextChunker(chunk_size=20, chunk_overlap=5) text = "John Smith lives in New York City with Jane Doe" # Mock predict function that finds entities in each chunk @@ -47,7 +47,7 @@ def predict_func(chunk): def test_empty_predictions(self): """Test handling of no predictions.""" - chunker = LocalTextChunker(chunk_size=50, chunk_overlap=10) + chunker = CharacterBasedTextChunker(chunk_size=50, chunk_overlap=10) text = "Some text without entities" predict_func = lambda chunk: [] From ea49b70817c8a433e38306939519b41ccb14eeb4 Mon Sep 17 00:00:00 2001 From: "AJ (Ashitosh Jedhe)" Date: Wed, 26 Nov 2025 16:42:16 +0000 Subject: [PATCH 06/14] Fix rename leftovers --- presidio-analyzer/presidio_analyzer/chunkers/__init__.py | 4 ++-- .../predefined_recognizers/ner/gliner_recognizer.py | 4 ++-- presidio-analyzer/tests/test_character_based_text_chunker.py | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/presidio-analyzer/presidio_analyzer/chunkers/__init__.py b/presidio-analyzer/presidio_analyzer/chunkers/__init__.py index 41379f784..13a28180a 100644 --- a/presidio-analyzer/presidio_analyzer/chunkers/__init__.py +++ b/presidio-analyzer/presidio_analyzer/chunkers/__init__.py @@ -1,6 +1,6 @@ """Text chunking strategies for handling long texts.""" from presidio_analyzer.chunkers.base_chunker import BaseTextChunker -from presidio_analyzer.chunkers.local_text_chunker import LocalTextChunker +from presidio_analyzer.chunkers.character_based_text_chunker import CharacterBasedTextChunker from presidio_analyzer.chunkers.chunking_utils import ( predict_with_chunking, process_text_in_chunks, @@ -9,7 +9,7 @@ __all__ = [ "BaseTextChunker", - "LocalTextChunker", + "CharacterBasedTextChunker", "predict_with_chunking", "process_text_in_chunks", "deduplicate_overlapping_entities", diff --git a/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py b/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py index 343767bb1..16f0d2a02 100644 --- a/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py +++ b/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py @@ -64,7 +64,7 @@ def __init__( :param map_location: The device to use for the model :param chunk_size: Maximum character length for text chunks (default: 250) :param chunk_overlap: Characters to overlap between chunks (default: 50) - :param text_chunker: Custom text chunking strategy. If None, uses LocalTextChunker + :param text_chunker: Custom text chunking strategy. If None, uses CharacterBasedTextChunker """ @@ -100,7 +100,7 @@ def __init__( self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap - # Use provided chunker or default to LocalTextChunker + # Use provided chunker or default to CharacterBasedTextChunker self.text_chunker = ( text_chunker if text_chunker is not None diff --git a/presidio-analyzer/tests/test_character_based_text_chunker.py b/presidio-analyzer/tests/test_character_based_text_chunker.py index 46b258abb..e21334946 100644 --- a/presidio-analyzer/tests/test_character_based_text_chunker.py +++ b/presidio-analyzer/tests/test_character_based_text_chunker.py @@ -4,8 +4,8 @@ from presidio_analyzer.chunkers import CharacterBasedTextChunker -class TestLocalTextChunker: - """Test LocalTextChunker implementation.""" +class TestCharacterBasedTextChunker: + """Test CharacterBasedTextChunker implementation.""" def test_empty_text(self): """Test chunking empty text.""" From 83e2bd4ebdd2b111a961ad9a2cf45617af24de22 Mon Sep 17 00:00:00 2001 From: "AJ (Ashitosh Jedhe)" Date: Thu, 27 Nov 2025 18:18:40 +0000 Subject: [PATCH 07/14] Update doc string --- .../presidio_analyzer/chunkers/__init__.py | 1 - .../chunkers/character_based_text_chunker.py | 13 ++++++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/presidio-analyzer/presidio_analyzer/chunkers/__init__.py b/presidio-analyzer/presidio_analyzer/chunkers/__init__.py index 13a28180a..5f2b08388 100644 --- a/presidio-analyzer/presidio_analyzer/chunkers/__init__.py +++ b/presidio-analyzer/presidio_analyzer/chunkers/__init__.py @@ -11,6 +11,5 @@ "BaseTextChunker", "CharacterBasedTextChunker", "predict_with_chunking", - "process_text_in_chunks", "deduplicate_overlapping_entities", ] diff --git a/presidio-analyzer/presidio_analyzer/chunkers/character_based_text_chunker.py b/presidio-analyzer/presidio_analyzer/chunkers/character_based_text_chunker.py index f4392e7da..a9fa0b245 100644 --- a/presidio-analyzer/presidio_analyzer/chunkers/character_based_text_chunker.py +++ b/presidio-analyzer/presidio_analyzer/chunkers/character_based_text_chunker.py @@ -12,10 +12,13 @@ class CharacterBasedTextChunker(BaseTextChunker): """Character-based text chunker with word boundary preservation.""" def __init__(self, chunk_size: int, chunk_overlap: int = 0): - """Initialize the local text chunker. + """Initialize the character-based text chunker. + + Note: Chunks may slightly exceed chunk_size to preserve complete words. + When this occurs, the actual overlap may vary from the specified value. - :param chunk_size: Maximum characters per chunk (must be > 0) - :param chunk_overlap: Characters to overlap between chunks (must be >= 0 and < chunk_size) + :param chunk_size: Target maximum characters per chunk (must be > 0) + :param chunk_overlap: Target characters to overlap between chunks (must be >= 0 and < chunk_size) """ if chunk_size <= 0: raise ValueError("chunk_size must be greater than 0") @@ -30,6 +33,10 @@ def __init__(self, chunk_size: int, chunk_overlap: int = 0): def chunk(self, text: str) -> List[str]: """Split text into overlapping chunks at word boundaries. + Chunks are extended to the nearest word boundary (space or newline) to avoid + splitting words. This means chunks may slightly exceed chunk_size. For texts + without spaces (e.g., CJK languages), chunks may extend to end of text. + :param text: The input text to chunk :return: List of text chunks with overlap """ From 555324551c26f15709237e90ec38e6704ad9027b Mon Sep 17 00:00:00 2001 From: "AJ (Ashitosh Jedhe)" Date: Tue, 2 Dec 2025 16:47:05 +0000 Subject: [PATCH 08/14] Add test for text without spaces and unicodes --- .../test_character_based_text_chunker.py | 81 ++++++++++++++----- 1 file changed, 62 insertions(+), 19 deletions(-) diff --git a/presidio-analyzer/tests/test_character_based_text_chunker.py b/presidio-analyzer/tests/test_character_based_text_chunker.py index e21334946..e8fae2994 100644 --- a/presidio-analyzer/tests/test_character_based_text_chunker.py +++ b/presidio-analyzer/tests/test_character_based_text_chunker.py @@ -94,40 +94,58 @@ def test_validation_overlap_greater_than_chunk_size(self): def test_multiple_chunks_coverage(self): """Test that chunks cover entire text.""" chunker = CharacterBasedTextChunker(chunk_size=5, chunk_overlap=1) - text = "1 2 3 4 5 6" # 11 chars + text = "1 2 3 4 5 6" # 11 chars: positions 0-10 result = chunker.chunk(text) - # Verify all numbers appear in at least one chunk + # Actual result: ['1 2 3', '3 4 5', '5 6'] + assert len(result) == 3 + assert result[0] == "1 2 3" + assert result[1] == "3 4 5" + assert result[2] == "5 6" + # Verify all digits appear (overlap causes duplication in joined string) all_text = "".join(result) - assert all(num in all_text for num in ["1", "2", "3", "4", "5", "6"]) + for digit in ["1", "2", "3", "4", "5", "6"]: + assert digit in all_text def test_newline_handling(self): """Test that newlines are preserved and treated as word boundaries.""" chunker = CharacterBasedTextChunker(chunk_size=10, chunk_overlap=0) - text = "line1\nline2\nline3" + text = "line1\nline2\nline3" # 17 chars result = chunker.chunk(text) - # Newlines should be preserved in output - combined = "".join(result) - assert combined == text - # Verify newlines exist in chunks - assert any("\n" in chunk for chunk in result) + # Chunk 1: "line1\nline2" (12 chars, extends to newline boundary at position 11) + # Chunk 2: "\nline3" (remaining 6 chars) + assert len(result) == 2 + assert result[0] == "line1\nline2" + assert result[1] == "\nline3" + # Verify complete text preserved + assert "".join(result) == text def test_default_parameters(self): """Test chunker with default overlap (0).""" - chunker = CharacterBasedTextChunker(chunk_size=5) # No overlap specified - text = "1 2 3 4" + chunker = CharacterBasedTextChunker(chunk_size=5) # No overlap specified (default=0) + text = "1 2 3 4" # 7 chars result = chunker.chunk(text) + # Chunk 1: "1 2 3" (5 chars, extends to word boundary at position 4) + # Chunk 2: starts at position 5: " 4" (remaining) assert len(result) == 2 + assert result[0] == "1 2 3" + assert result[1] == " 4" def test_very_long_text(self): """Test chunking very long text.""" chunker = CharacterBasedTextChunker(chunk_size=10, chunk_overlap=2) text = " ".join([str(i) for i in range(50)]) # "0 1 2 3..." + # Text: "0 1 2 3 4 5 6 7 8 9 10 11..." = 138 chars result = chunker.chunk(text) - # Should create many chunks - assert len(result) > 5 - # Verify chunks are reasonable size - for chunk in result: - assert len(chunk) <= 15 + # With chunk_size=10, overlap=2, word boundaries: creates 16 chunks + assert len(result) == 16 + # First chunk + assert result[0] == "0 1 2 3 4 5" + # Last chunk + assert result[-1] == "48 49" + # Verify all numbers appear in chunks + all_text = " ".join(result) + for i in range(50): + assert str(i) in all_text def test_real_world_example(self): """Test with real-world PII detection scenario.""" @@ -137,10 +155,35 @@ def test_real_world_example(self): He lives at 123 Main Street, Anytown, ST 12345. For contact, his phone number is (555) 123-4567.""" result = chunker.chunk(text) - # Should be 1-2 chunks depending on exact length - assert 1 <= len(result) <= 2 - # All PII should be present in at least one chunk + # Text is 251 chars, creates 2 chunks with overlap + assert len(result) == 2 + # All PII should be present across chunks all_text = " ".join(result) assert "4532-1234-5678-9010" in all_text assert "123-45-6789" in all_text assert "john.smith@example.com" in all_text + assert "123-4567" in all_text + + def test_cjk_text_without_spaces(self): + """Test CJK text without spaces extends to end of text.""" + chunker = CharacterBasedTextChunker(chunk_size=5, chunk_overlap=1) + text = "你好世界测试" # 6 Chinese characters, no spaces + result = chunker.chunk(text) + # No spaces, so first chunk extends all the way to end + # (word boundary extension continues until end of text) + assert len(result) == 1 + assert result[0] == text + + def test_unicode_emoji_handling(self): + """Test Unicode characters and emojis are handled correctly.""" + chunker = CharacterBasedTextChunker(chunk_size=10, chunk_overlap=2) + text = "Hello 👋 World 🌍 Test" + result = chunker.chunk(text) + # Verify emojis are preserved in chunks + all_text = "".join(result) + assert "👋" in all_text + assert "🌍" in all_text + # Verify all words appear (overlap may cause partial duplication) + assert "Hello" in all_text + assert "World" in all_text # May appear as 'Worldld' due to overlap + assert "Test" in all_text From 0d53ce135d91df4da7c39f8417cb392d008c5afc Mon Sep 17 00:00:00 2001 From: "AJ (Ashitosh Jedhe)" Date: Tue, 2 Dec 2025 17:11:48 +0000 Subject: [PATCH 09/14] Resove linting - format code --- .../presidio_analyzer/chunkers/__init__.py | 8 +++-- .../chunkers/base_chunker.py | 2 +- .../chunkers/character_based_text_chunker.py | 16 +++++----- .../chunkers/chunking_utils.py | 18 +++++++----- .../ner/gliner_recognizer.py | 29 +++++++++++-------- 5 files changed, 43 insertions(+), 30 deletions(-) diff --git a/presidio-analyzer/presidio_analyzer/chunkers/__init__.py b/presidio-analyzer/presidio_analyzer/chunkers/__init__.py index 5f2b08388..d73e4032d 100644 --- a/presidio-analyzer/presidio_analyzer/chunkers/__init__.py +++ b/presidio-analyzer/presidio_analyzer/chunkers/__init__.py @@ -1,15 +1,19 @@ """Text chunking strategies for handling long texts.""" + from presidio_analyzer.chunkers.base_chunker import BaseTextChunker -from presidio_analyzer.chunkers.character_based_text_chunker import CharacterBasedTextChunker +from presidio_analyzer.chunkers.character_based_text_chunker import ( + CharacterBasedTextChunker, +) from presidio_analyzer.chunkers.chunking_utils import ( + deduplicate_overlapping_entities, predict_with_chunking, process_text_in_chunks, - deduplicate_overlapping_entities, ) __all__ = [ "BaseTextChunker", "CharacterBasedTextChunker", "predict_with_chunking", + "process_text_in_chunks", "deduplicate_overlapping_entities", ] diff --git a/presidio-analyzer/presidio_analyzer/chunkers/base_chunker.py b/presidio-analyzer/presidio_analyzer/chunkers/base_chunker.py index a85ae2765..e9e331007 100644 --- a/presidio-analyzer/presidio_analyzer/chunkers/base_chunker.py +++ b/presidio-analyzer/presidio_analyzer/chunkers/base_chunker.py @@ -9,7 +9,7 @@ class BaseTextChunker(ABC): @abstractmethod def chunk(self, text: str) -> List[str]: """Split text into chunks. - + :param text: The input text to split :return: List of text chunks """ diff --git a/presidio-analyzer/presidio_analyzer/chunkers/character_based_text_chunker.py b/presidio-analyzer/presidio_analyzer/chunkers/character_based_text_chunker.py index a9fa0b245..63b1656b9 100644 --- a/presidio-analyzer/presidio_analyzer/chunkers/character_based_text_chunker.py +++ b/presidio-analyzer/presidio_analyzer/chunkers/character_based_text_chunker.py @@ -16,9 +16,10 @@ def __init__(self, chunk_size: int, chunk_overlap: int = 0): Note: Chunks may slightly exceed chunk_size to preserve complete words. When this occurs, the actual overlap may vary from the specified value. - + :param chunk_size: Target maximum characters per chunk (must be > 0) - :param chunk_overlap: Target characters to overlap between chunks (must be >= 0 and < chunk_size) + :param chunk_overlap: Target characters to overlap between chunks + (must be >= 0 and < chunk_size) """ if chunk_size <= 0: raise ValueError("chunk_size must be greater than 0") @@ -32,11 +33,12 @@ def __init__(self, chunk_size: int, chunk_overlap: int = 0): def chunk(self, text: str) -> List[str]: """Split text into overlapping chunks at word boundaries. - - Chunks are extended to the nearest word boundary (space or newline) to avoid - splitting words. This means chunks may slightly exceed chunk_size. For texts - without spaces (e.g., CJK languages), chunks may extend to end of text. - + + Chunks are extended to the nearest word boundary (space or newline) + to avoid splitting words. This means chunks may slightly exceed + chunk_size. For texts without spaces (e.g., CJK languages), chunks + may extend to end of text. + :param text: The input text to chunk :return: List of text chunks with overlap """ diff --git a/presidio-analyzer/presidio_analyzer/chunkers/chunking_utils.py b/presidio-analyzer/presidio_analyzer/chunkers/chunking_utils.py index ed4492396..5c2dc2b48 100644 --- a/presidio-analyzer/presidio_analyzer/chunkers/chunking_utils.py +++ b/presidio-analyzer/presidio_analyzer/chunkers/chunking_utils.py @@ -10,10 +10,10 @@ def predict_with_chunking( chunker: BaseTextChunker, ) -> List[Dict[str, Any]]: """Process text with automatic chunking for long texts. - + For short text (≤ chunker.chunk_size), calls predict_func directly. For long text, chunks it and merges predictions with deduplication. - + :param text: Input text to process :param predict_func: Function that takes text and returns predictions :param chunker: Text chunking strategy (contains chunk_size and chunk_overlap) @@ -21,7 +21,7 @@ def predict_with_chunking( """ if len(text) <= chunker.chunk_size: return predict_func(text) - + predictions = process_text_in_chunks( text=text, chunker=chunker, @@ -37,7 +37,7 @@ def process_text_in_chunks( chunk_overlap: int, ) -> List[Dict[str, Any]]: """Process text in chunks and adjust entity offsets. - + :param text: Input text to process :param chunker: Text chunking strategy :param process_func: Function that takes chunk text and returns predictions @@ -50,7 +50,7 @@ def process_text_in_chunks( for chunk in chunks: chunk_predictions = process_func(chunk) - + # Adjust offsets to match original text position for pred in chunk_predictions: pred["start"] += offset @@ -65,9 +65,11 @@ def deduplicate_overlapping_entities( predictions: List[Dict[str, Any]], overlap_threshold: float = 0.5 ) -> List[Dict[str, Any]]: """Remove duplicate entities from overlapping chunks. - - :param predictions: List of predictions with 'start', 'end', 'label', 'score' - :param overlap_threshold: Overlap ratio threshold to consider duplicates (default: 0.5) + + :param predictions: List of predictions with 'start', 'end', 'label', + 'score' + :param overlap_threshold: Overlap ratio threshold to consider duplicates + (default: 0.5) :return: Deduplicated list of predictions sorted by position """ if not predictions: diff --git a/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py b/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py index 16f0d2a02..36ef693f1 100644 --- a/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py +++ b/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py @@ -1,18 +1,18 @@ import json import logging -from typing import Dict, List, Optional +from typing import Any, Dict, List, Optional from presidio_analyzer import ( AnalysisExplanation, LocalRecognizer, RecognizerResult, ) -from presidio_analyzer.nlp_engine import NerModelConfiguration, NlpArtifacts from presidio_analyzer.chunkers import ( BaseTextChunker, CharacterBasedTextChunker, predict_with_chunking, ) +from presidio_analyzer.nlp_engine import NerModelConfiguration, NlpArtifacts try: from gliner import GLiNER, GLiNERConfig @@ -62,9 +62,12 @@ def __init__( :param threshold: The threshold for the model's output (see GLiNER's documentation) :param map_location: The device to use for the model - :param chunk_size: Maximum character length for text chunks (default: 250) - :param chunk_overlap: Characters to overlap between chunks (default: 50) - :param text_chunker: Custom text chunking strategy. If None, uses CharacterBasedTextChunker + :param chunk_size: Maximum character length for text chunks + (default: 250) + :param chunk_overlap: Characters to overlap between chunks + (default: 50) + :param text_chunker: Custom text chunking strategy. If None, uses + CharacterBasedTextChunker """ @@ -142,13 +145,15 @@ def analyze( labels = self.__create_input_labels(entities) # Process text with automatic chunking - predict_func = lambda text: self.gliner.predict_entities( - text=text, - labels=labels, - flat_ner=self.flat_ner, - threshold=self.threshold, - multi_label=self.multi_label, - ) + def predict_func(text: str) -> List[Dict[str, Any]]: + return self.gliner.predict_entities( + text=text, + labels=labels, + flat_ner=self.flat_ner, + threshold=self.threshold, + multi_label=self.multi_label, + ) + predictions = predict_with_chunking( text=text, predict_func=predict_func, From 560021ce5dd7599d0a4335161821a47e401733df Mon Sep 17 00:00:00 2001 From: "AJ (Ashitosh Jedhe)" Date: Wed, 3 Dec 2025 12:03:42 +0000 Subject: [PATCH 10/14] Add logging to character based text chunker --- .../chunkers/character_based_text_chunker.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/presidio-analyzer/presidio_analyzer/chunkers/character_based_text_chunker.py b/presidio-analyzer/presidio_analyzer/chunkers/character_based_text_chunker.py index 63b1656b9..90f5242f2 100644 --- a/presidio-analyzer/presidio_analyzer/chunkers/character_based_text_chunker.py +++ b/presidio-analyzer/presidio_analyzer/chunkers/character_based_text_chunker.py @@ -3,10 +3,13 @@ Based on gliner-spacy implementation: https://github.com/theirstory/gliner-spacy/blob/main/gliner_spacy/pipeline.py#L60-L96 """ +import logging from typing import List from presidio_analyzer.chunkers.base_chunker import BaseTextChunker +logger = logging.getLogger("presidio-analyzer") + class CharacterBasedTextChunker(BaseTextChunker): """Character-based text chunker with word boundary preservation.""" @@ -22,8 +25,12 @@ def __init__(self, chunk_size: int, chunk_overlap: int = 0): (must be >= 0 and < chunk_size) """ if chunk_size <= 0: + logger.error("Invalid chunk_size: %d. Must be greater than 0.", chunk_size) raise ValueError("chunk_size must be greater than 0") if chunk_overlap < 0 or chunk_overlap >= chunk_size: + logger.error( + "Invalid chunk_overlap. Must be non-negative and less than chunk_size" + ) raise ValueError( "chunk_overlap must be non-negative and less than chunk_size" ) @@ -43,8 +50,16 @@ def chunk(self, text: str) -> List[str]: :return: List of text chunks with overlap """ if not text: + logger.debug("Empty text provided, returning empty chunk list") return [] + logger.debug( + "Chunking text: length=%d, chunk_size=%d, overlap=%d", + len(text), + self.chunk_size, + self.chunk_overlap, + ) + chunks = [] start = 0 @@ -67,4 +82,5 @@ def chunk(self, text: str) -> List[str]: break start = end - self.chunk_overlap + logger.debug("Created %d chunks from text", len(chunks)) return chunks From 1556d7302d3c3a5c89e12e6109b1c26c00f7fb3b Mon Sep 17 00:00:00 2001 From: "AJ (Ashitosh Jedhe)" Date: Wed, 3 Dec 2025 13:00:06 +0000 Subject: [PATCH 11/14] Update to remove redundent chunk_overlap parameter --- .../presidio_analyzer/chunkers/chunking_utils.py | 5 +---- presidio-analyzer/tests/test_chunking_utils.py | 6 +++--- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/presidio-analyzer/presidio_analyzer/chunkers/chunking_utils.py b/presidio-analyzer/presidio_analyzer/chunkers/chunking_utils.py index 5c2dc2b48..f8ae9cdd5 100644 --- a/presidio-analyzer/presidio_analyzer/chunkers/chunking_utils.py +++ b/presidio-analyzer/presidio_analyzer/chunkers/chunking_utils.py @@ -26,7 +26,6 @@ def predict_with_chunking( text=text, chunker=chunker, process_func=predict_func, - chunk_overlap=chunker.chunk_overlap, ) return deduplicate_overlapping_entities(predictions) @@ -34,14 +33,12 @@ def process_text_in_chunks( text: str, chunker: BaseTextChunker, process_func: Callable[[str], List[Dict[str, Any]]], - chunk_overlap: int, ) -> List[Dict[str, Any]]: """Process text in chunks and adjust entity offsets. :param text: Input text to process :param chunker: Text chunking strategy :param process_func: Function that takes chunk text and returns predictions - :param chunk_overlap: Number of characters overlapping between chunks :return: List of predictions with adjusted offsets """ chunks = chunker.chunk(text) @@ -57,7 +54,7 @@ def process_text_in_chunks( pred["end"] += offset all_predictions.extend(chunk_predictions) - offset += len(chunk) - chunk_overlap + offset += len(chunk) - chunker.chunk_overlap return all_predictions diff --git a/presidio-analyzer/tests/test_chunking_utils.py b/presidio-analyzer/tests/test_chunking_utils.py index 3b19129d4..803b11f04 100644 --- a/presidio-analyzer/tests/test_chunking_utils.py +++ b/presidio-analyzer/tests/test_chunking_utils.py @@ -17,7 +17,7 @@ def test_short_text_no_chunking(self): text = "Short text" predict_func = lambda chunk: [{"start": 0, "end": 5, "label": "PERSON", "score": 0.9}] - result = process_text_in_chunks(text, chunker, predict_func, chunk_overlap=20) + result = process_text_in_chunks(text, chunker, predict_func) assert len(result) == 1 assert result[0]["start"] == 0 @@ -37,7 +37,7 @@ def predict_func(chunk): return [{"start": idx, "end": idx + 8, "label": "PERSON", "score": 0.85}] return [] - result = process_text_in_chunks(text, chunker, predict_func, chunk_overlap=5) + result = process_text_in_chunks(text, chunker, predict_func) # First entity should be at original position assert result[0]["start"] == 0 @@ -51,7 +51,7 @@ def test_empty_predictions(self): text = "Some text without entities" predict_func = lambda chunk: [] - result = process_text_in_chunks(text, chunker, predict_func, chunk_overlap=10) + result = process_text_in_chunks(text, chunker, predict_func) assert result == [] From d722aaa315635013652a5b840fa85b162c8d00b9 Mon Sep 17 00:00:00 2001 From: "AJ (Ashitosh Jedhe)" Date: Tue, 6 Jan 2026 09:51:49 +0000 Subject: [PATCH 12/14] Remove chunk size and chunk overlap from GlinerRecognizer constructor --- .../chunkers/base_chunker.py | 26 ++++++- .../chunkers/character_based_text_chunker.py | 38 ++++++--- .../chunkers/chunking_utils.py | 30 ++++---- .../ner/gliner_recognizer.py | 13 +--- .../test_character_based_text_chunker.py | 77 +++++++++++++------ .../tests/test_chunking_utils.py | 9 ++- .../tests/test_gliner_recognizer.py | 9 +-- 7 files changed, 129 insertions(+), 73 deletions(-) diff --git a/presidio-analyzer/presidio_analyzer/chunkers/base_chunker.py b/presidio-analyzer/presidio_analyzer/chunkers/base_chunker.py index e9e331007..09a8d0222 100644 --- a/presidio-analyzer/presidio_analyzer/chunkers/base_chunker.py +++ b/presidio-analyzer/presidio_analyzer/chunkers/base_chunker.py @@ -1,16 +1,34 @@ """Abstract base class for text chunking strategies.""" from abc import ABC, abstractmethod +from dataclasses import dataclass from typing import List +@dataclass +class TextChunk: + """Represents a chunk of text with its position in the original text. + + :param text: The chunk content + :param start: Start position in the original text (inclusive) + :param end: End position in the original text (exclusive) + """ + text: str + start: int + end: int + + class BaseTextChunker(ABC): - """Abstract base class for text chunking strategies.""" + """Abstract base class for text chunking strategies. + + Subclasses must implement the chunk() method to split text into + TextChunk objects that include both content and position information. + """ @abstractmethod - def chunk(self, text: str) -> List[str]: - """Split text into chunks. + def chunk(self, text: str) -> List[TextChunk]: + """Split text into chunks with position information. :param text: The input text to split - :return: List of text chunks + :return: List of TextChunk objects with text and position data """ pass diff --git a/presidio-analyzer/presidio_analyzer/chunkers/character_based_text_chunker.py b/presidio-analyzer/presidio_analyzer/chunkers/character_based_text_chunker.py index 90f5242f2..be8a12c80 100644 --- a/presidio-analyzer/presidio_analyzer/chunkers/character_based_text_chunker.py +++ b/presidio-analyzer/presidio_analyzer/chunkers/character_based_text_chunker.py @@ -6,7 +6,7 @@ import logging from typing import List -from presidio_analyzer.chunkers.base_chunker import BaseTextChunker +from presidio_analyzer.chunkers.base_chunker import BaseTextChunker, TextChunk logger = logging.getLogger("presidio-analyzer") @@ -35,10 +35,26 @@ def __init__(self, chunk_size: int, chunk_overlap: int = 0): "chunk_overlap must be non-negative and less than chunk_size" ) - self.chunk_size = chunk_size - self.chunk_overlap = chunk_overlap + self._chunk_size = chunk_size + self._chunk_overlap = chunk_overlap - def chunk(self, text: str) -> List[str]: + @property + def chunk_size(self) -> int: + """Get the chunk size. + + :return: The chunk size + """ + return self._chunk_size + + @property + def chunk_overlap(self) -> int: + """Get the chunk overlap. + + :return: The chunk overlap + """ + return self._chunk_overlap + + def chunk(self, text: str) -> List[TextChunk]: """Split text into overlapping chunks at word boundaries. Chunks are extended to the nearest word boundary (space or newline) @@ -47,7 +63,7 @@ def chunk(self, text: str) -> List[str]: may extend to end of text. :param text: The input text to chunk - :return: List of text chunks with overlap + :return: List of TextChunk objects with text and position information """ if not text: logger.debug("Empty text provided, returning empty chunk list") @@ -56,8 +72,8 @@ def chunk(self, text: str) -> List[str]: logger.debug( "Chunking text: length=%d, chunk_size=%d, overlap=%d", len(text), - self.chunk_size, - self.chunk_overlap, + self._chunk_size, + self._chunk_overlap, ) chunks = [] @@ -66,8 +82,8 @@ def chunk(self, text: str) -> List[str]: while start < len(text): # Calculate end position end = ( - start + self.chunk_size - if start + self.chunk_size < len(text) + start + self._chunk_size + if start + self._chunk_size < len(text) else len(text) ) @@ -75,12 +91,12 @@ def chunk(self, text: str) -> List[str]: while end < len(text) and text[end] not in [" ", "\n"]: end += 1 - chunks.append(text[start:end]) + chunks.append(TextChunk(text=text[start:end], start=start, end=end)) # Move start position with overlap (stop if we've covered all text) if end >= len(text): break - start = end - self.chunk_overlap + start = end - self._chunk_overlap logger.debug("Created %d chunks from text", len(chunks)) return chunks diff --git a/presidio-analyzer/presidio_analyzer/chunkers/chunking_utils.py b/presidio-analyzer/presidio_analyzer/chunkers/chunking_utils.py index f8ae9cdd5..0c3445f13 100644 --- a/presidio-analyzer/presidio_analyzer/chunkers/chunking_utils.py +++ b/presidio-analyzer/presidio_analyzer/chunkers/chunking_utils.py @@ -1,7 +1,7 @@ """Utility functions for processing text with chunking strategies.""" from typing import Any, Callable, Dict, List -from presidio_analyzer.chunkers.base_chunker import BaseTextChunker +from presidio_analyzer.chunkers.base_chunker import BaseTextChunker, TextChunk def predict_with_chunking( @@ -11,50 +11,46 @@ def predict_with_chunking( ) -> List[Dict[str, Any]]: """Process text with automatic chunking for long texts. - For short text (≤ chunker.chunk_size), calls predict_func directly. + For short text, calls predict_func directly. For long text, chunks it and merges predictions with deduplication. :param text: Input text to process :param predict_func: Function that takes text and returns predictions - :param chunker: Text chunking strategy (contains chunk_size and chunk_overlap) + :param chunker: Text chunking strategy :return: List of predictions with correct offsets """ - if len(text) <= chunker.chunk_size: + # Try direct prediction first for potentially short texts + chunks = chunker.chunk(text) + if len(chunks) <= 1: return predict_func(text) predictions = process_text_in_chunks( - text=text, - chunker=chunker, + chunks=chunks, process_func=predict_func, ) return deduplicate_overlapping_entities(predictions) def process_text_in_chunks( - text: str, - chunker: BaseTextChunker, + chunks: List[TextChunk], process_func: Callable[[str], List[Dict[str, Any]]], ) -> List[Dict[str, Any]]: - """Process text in chunks and adjust entity offsets. + """Process text chunks and adjust entity offsets. - :param text: Input text to process - :param chunker: Text chunking strategy + :param chunks: List of TextChunk objects with text and position information :param process_func: Function that takes chunk text and returns predictions :return: List of predictions with adjusted offsets """ - chunks = chunker.chunk(text) all_predictions = [] - offset = 0 for chunk in chunks: - chunk_predictions = process_func(chunk) + chunk_predictions = process_func(chunk.text) # Adjust offsets to match original text position for pred in chunk_predictions: - pred["start"] += offset - pred["end"] += offset + pred["start"] += chunk.start + pred["end"] += chunk.start all_predictions.extend(chunk_predictions) - offset += len(chunk) - chunker.chunk_overlap return all_predictions diff --git a/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py b/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py index e3c008517..cbe027154 100644 --- a/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py +++ b/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py @@ -40,8 +40,6 @@ def __init__( multi_label: bool = False, threshold: float = 0.30, map_location: str = "cpu", - chunk_size: int = 250, - chunk_overlap: int = 50, text_chunker: Optional[BaseTextChunker] = None, ): """GLiNER model based entity recognizer. @@ -62,12 +60,9 @@ def __init__( :param threshold: The threshold for the model's output (see GLiNER's documentation) :param map_location: The device to use for the model - :param chunk_size: Maximum character length for text chunks - (default: 250) - :param chunk_overlap: Characters to overlap between chunks - (default: 50) :param text_chunker: Custom text chunking strategy. If None, uses - CharacterBasedTextChunker + CharacterBasedTextChunker with default settings (chunk_size=250, + chunk_overlap=50) """ @@ -100,14 +95,12 @@ def __init__( self.flat_ner = flat_ner self.multi_label = multi_label self.threshold = threshold - self.chunk_size = chunk_size - self.chunk_overlap = chunk_overlap # Use provided chunker or default to CharacterBasedTextChunker self.text_chunker = ( text_chunker if text_chunker is not None - else CharacterBasedTextChunker(chunk_size, chunk_overlap) + else CharacterBasedTextChunker(chunk_size=250, chunk_overlap=50) ) self.gliner = None diff --git a/presidio-analyzer/tests/test_character_based_text_chunker.py b/presidio-analyzer/tests/test_character_based_text_chunker.py index e8fae2994..04cca2367 100644 --- a/presidio-analyzer/tests/test_character_based_text_chunker.py +++ b/presidio-analyzer/tests/test_character_based_text_chunker.py @@ -19,7 +19,9 @@ def test_short_text(self): text = "This is a short text." result = chunker.chunk(text) assert len(result) == 1 - assert result[0] == text + assert result[0].text == text + assert result[0].start == 0 + assert result[0].end == len(text) def test_long_text_without_overlap(self): """Test long text with no overlap.""" @@ -28,8 +30,12 @@ def test_long_text_without_overlap(self): result = chunker.chunk(text) # Actual behavior: word boundaries extend chunks: ["1 2", " 3 4"] assert len(result) == 2 - assert result[0] == "1 2" - assert result[1] == " 3 4" + assert result[0].text == "1 2" + assert result[0].start == 0 + assert result[0].end == 3 + assert result[1].text == " 3 4" + assert result[1].start == 3 + assert result[1].end == 7 def test_long_text_with_overlap(self): """Test long text with overlap.""" @@ -38,10 +44,14 @@ def test_long_text_with_overlap(self): result = chunker.chunk(text) assert len(result) == 2 - assert result[0] == "1 3 5" - assert result[1] == " 5 7 9" + assert result[0].text == "1 3 5" + assert result[0].start == 0 + assert result[0].end == 5 + assert result[1].text == " 5 7 9" + assert result[1].start == 3 + assert result[1].end == 9 # Verify overlap exists - assert result[0].endswith(" 5") and result[1].startswith(" 5") + assert result[0].text.endswith(" 5") and result[1].text.startswith(" 5") def test_word_boundary_preservation(self): """Test that chunks extend to word boundaries.""" @@ -49,7 +59,9 @@ def test_word_boundary_preservation(self): text = "one two three four" result = chunker.chunk(text) # Chunks extend to word boundaries: "one two three" (13 chars) instead of breaking at 8 - assert result[0] == "one two three" + assert result[0].text == "one two three" + assert result[0].start == 0 + assert result[0].end == 13 assert len(result) == 2 def test_exact_chunk_size(self): @@ -58,7 +70,9 @@ def test_exact_chunk_size(self): text = "1 2 3" result = chunker.chunk(text) assert len(result) == 1 - assert result[0] == text + assert result[0].text == text + assert result[0].start == 0 + assert result[0].end == len(text) def test_validation_zero_chunk_size(self): """Test that chunk_size must be > 0.""" @@ -98,11 +112,17 @@ def test_multiple_chunks_coverage(self): result = chunker.chunk(text) # Actual result: ['1 2 3', '3 4 5', '5 6'] assert len(result) == 3 - assert result[0] == "1 2 3" - assert result[1] == "3 4 5" - assert result[2] == "5 6" + assert result[0].text == "1 2 3" + assert result[0].start == 0 + assert result[0].end == 5 + assert result[1].text == "3 4 5" + assert result[1].start == 4 + assert result[1].end == 9 + assert result[2].text == "5 6" + assert result[2].start == 8 + assert result[2].end == 11 # Verify all digits appear (overlap causes duplication in joined string) - all_text = "".join(result) + all_text = "".join([c.text for c in result]) for digit in ["1", "2", "3", "4", "5", "6"]: assert digit in all_text @@ -114,10 +134,14 @@ def test_newline_handling(self): # Chunk 1: "line1\nline2" (12 chars, extends to newline boundary at position 11) # Chunk 2: "\nline3" (remaining 6 chars) assert len(result) == 2 - assert result[0] == "line1\nline2" - assert result[1] == "\nline3" + assert result[0].text == "line1\nline2" + assert result[0].start == 0 + assert result[0].end == 11 + assert result[1].text == "\nline3" + assert result[1].start == 11 + assert result[1].end == 17 # Verify complete text preserved - assert "".join(result) == text + assert "".join([c.text for c in result]) == text def test_default_parameters(self): """Test chunker with default overlap (0).""" @@ -127,8 +151,12 @@ def test_default_parameters(self): # Chunk 1: "1 2 3" (5 chars, extends to word boundary at position 4) # Chunk 2: starts at position 5: " 4" (remaining) assert len(result) == 2 - assert result[0] == "1 2 3" - assert result[1] == " 4" + assert result[0].text == "1 2 3" + assert result[0].start == 0 + assert result[0].end == 5 + assert result[1].text == " 4" + assert result[1].start == 5 + assert result[1].end == 7 def test_very_long_text(self): """Test chunking very long text.""" @@ -139,11 +167,12 @@ def test_very_long_text(self): # With chunk_size=10, overlap=2, word boundaries: creates 16 chunks assert len(result) == 16 # First chunk - assert result[0] == "0 1 2 3 4 5" + assert result[0].text == "0 1 2 3 4 5" + assert result[0].start == 0 # Last chunk - assert result[-1] == "48 49" + assert result[-1].text == "48 49" # Verify all numbers appear in chunks - all_text = " ".join(result) + all_text = " ".join([c.text for c in result]) for i in range(50): assert str(i) in all_text @@ -158,7 +187,7 @@ def test_real_world_example(self): # Text is 251 chars, creates 2 chunks with overlap assert len(result) == 2 # All PII should be present across chunks - all_text = " ".join(result) + all_text = " ".join([c.text for c in result]) assert "4532-1234-5678-9010" in all_text assert "123-45-6789" in all_text assert "john.smith@example.com" in all_text @@ -172,7 +201,9 @@ def test_cjk_text_without_spaces(self): # No spaces, so first chunk extends all the way to end # (word boundary extension continues until end of text) assert len(result) == 1 - assert result[0] == text + assert result[0].text == text + assert result[0].start == 0 + assert result[0].end == len(text) def test_unicode_emoji_handling(self): """Test Unicode characters and emojis are handled correctly.""" @@ -180,7 +211,7 @@ def test_unicode_emoji_handling(self): text = "Hello 👋 World 🌍 Test" result = chunker.chunk(text) # Verify emojis are preserved in chunks - all_text = "".join(result) + all_text = "".join([c.text for c in result]) assert "👋" in all_text assert "🌍" in all_text # Verify all words appear (overlap may cause partial duplication) diff --git a/presidio-analyzer/tests/test_chunking_utils.py b/presidio-analyzer/tests/test_chunking_utils.py index 803b11f04..bb19114e6 100644 --- a/presidio-analyzer/tests/test_chunking_utils.py +++ b/presidio-analyzer/tests/test_chunking_utils.py @@ -15,9 +15,10 @@ def test_short_text_no_chunking(self): """Test text shorter than chunk size is not chunked.""" chunker = CharacterBasedTextChunker(chunk_size=100, chunk_overlap=20) text = "Short text" + chunks = chunker.chunk(text) predict_func = lambda chunk: [{"start": 0, "end": 5, "label": "PERSON", "score": 0.9}] - result = process_text_in_chunks(text, chunker, predict_func) + result = process_text_in_chunks(chunks, predict_func) assert len(result) == 1 assert result[0]["start"] == 0 @@ -27,6 +28,7 @@ def test_long_text_with_offset_adjustment(self): """Test offset adjustment for chunked text.""" chunker = CharacterBasedTextChunker(chunk_size=20, chunk_overlap=5) text = "John Smith lives in New York City with Jane Doe" + chunks = chunker.chunk(text) # Mock predict function that finds entities in each chunk def predict_func(chunk): @@ -37,7 +39,7 @@ def predict_func(chunk): return [{"start": idx, "end": idx + 8, "label": "PERSON", "score": 0.85}] return [] - result = process_text_in_chunks(text, chunker, predict_func) + result = process_text_in_chunks(chunks, predict_func) # First entity should be at original position assert result[0]["start"] == 0 @@ -49,9 +51,10 @@ def test_empty_predictions(self): """Test handling of no predictions.""" chunker = CharacterBasedTextChunker(chunk_size=50, chunk_overlap=10) text = "Some text without entities" + chunks = chunker.chunk(text) predict_func = lambda chunk: [] - result = process_text_in_chunks(text, chunker, predict_func) + result = process_text_in_chunks(chunks, predict_func) assert result == [] diff --git a/presidio-analyzer/tests/test_gliner_recognizer.py b/presidio-analyzer/tests/test_gliner_recognizer.py index 916ecd91f..2288c9c64 100644 --- a/presidio-analyzer/tests/test_gliner_recognizer.py +++ b/presidio-analyzer/tests/test_gliner_recognizer.py @@ -4,6 +4,7 @@ from unittest.mock import MagicMock, patch from presidio_analyzer.predefined_recognizers import GLiNERRecognizer +from presidio_analyzer.chunkers import CharacterBasedTextChunker @pytest.fixture @@ -155,7 +156,7 @@ def mock_predict_entities(text, labels, flat_ner, threshold, multi_label): gliner_recognizer = GLiNERRecognizer( entity_mapping={"person": "PERSON"}, - chunk_size=250, + text_chunker=CharacterBasedTextChunker(chunk_size=250, chunk_overlap=50), ) gliner_recognizer.gliner = mock_gliner @@ -197,8 +198,7 @@ def mock_predict_entities(text, labels, flat_ner, threshold, multi_label): gliner_recognizer = GLiNERRecognizer( entity_mapping={"person": "PERSON"}, - chunk_size=250, - chunk_overlap=50, + text_chunker=CharacterBasedTextChunker(chunk_size=250, chunk_overlap=50), ) gliner_recognizer.gliner = mock_gliner @@ -234,8 +234,7 @@ def mock_predict_entities(text, labels, flat_ner, threshold, multi_label): gliner_recognizer = GLiNERRecognizer( entity_mapping={"person": "PERSON"}, - chunk_size=250, - chunk_overlap=50, + text_chunker=CharacterBasedTextChunker(chunk_size=250, chunk_overlap=50), ) gliner_recognizer.gliner = mock_gliner From 8f637dec8d39fb4b2c03b68f4097bb7bdbb0c706 Mon Sep 17 00:00:00 2001 From: "AJ (Ashitosh Jedhe)" Date: Wed, 7 Jan 2026 18:41:51 +0000 Subject: [PATCH 13/14] Updated the utilities to use RecognizerResult --- .../presidio_analyzer/chunkers/__init__.py | 3 +- .../chunkers/chunking_utils.py | 46 ++++++------- .../ner/gliner_recognizer.py | 59 ++++++++-------- .../tests/test_chunking_utils.py | 67 ++++++++++--------- 4 files changed, 94 insertions(+), 81 deletions(-) diff --git a/presidio-analyzer/presidio_analyzer/chunkers/__init__.py b/presidio-analyzer/presidio_analyzer/chunkers/__init__.py index d73e4032d..9b695c4c7 100644 --- a/presidio-analyzer/presidio_analyzer/chunkers/__init__.py +++ b/presidio-analyzer/presidio_analyzer/chunkers/__init__.py @@ -1,6 +1,6 @@ """Text chunking strategies for handling long texts.""" -from presidio_analyzer.chunkers.base_chunker import BaseTextChunker +from presidio_analyzer.chunkers.base_chunker import BaseTextChunker, TextChunk from presidio_analyzer.chunkers.character_based_text_chunker import ( CharacterBasedTextChunker, ) @@ -12,6 +12,7 @@ __all__ = [ "BaseTextChunker", + "TextChunk", "CharacterBasedTextChunker", "predict_with_chunking", "process_text_in_chunks", diff --git a/presidio-analyzer/presidio_analyzer/chunkers/chunking_utils.py b/presidio-analyzer/presidio_analyzer/chunkers/chunking_utils.py index 0c3445f13..9faff2e95 100644 --- a/presidio-analyzer/presidio_analyzer/chunkers/chunking_utils.py +++ b/presidio-analyzer/presidio_analyzer/chunkers/chunking_utils.py @@ -1,23 +1,24 @@ """Utility functions for processing text with chunking strategies.""" -from typing import Any, Callable, Dict, List +from typing import Callable, List +from presidio_analyzer import RecognizerResult from presidio_analyzer.chunkers.base_chunker import BaseTextChunker, TextChunk def predict_with_chunking( text: str, - predict_func: Callable[[str], List[Dict[str, Any]]], + predict_func: Callable[[str], List[RecognizerResult]], chunker: BaseTextChunker, -) -> List[Dict[str, Any]]: +) -> List[RecognizerResult]: """Process text with automatic chunking for long texts. For short text, calls predict_func directly. For long text, chunks it and merges predictions with deduplication. :param text: Input text to process - :param predict_func: Function that takes text and returns predictions + :param predict_func: Function that takes text and returns RecognizerResult objects :param chunker: Text chunking strategy - :return: List of predictions with correct offsets + :return: List of RecognizerResult with correct offsets """ # Try direct prediction first for potentially short texts chunks = chunker.chunk(text) @@ -32,13 +33,13 @@ def predict_with_chunking( def process_text_in_chunks( chunks: List[TextChunk], - process_func: Callable[[str], List[Dict[str, Any]]], -) -> List[Dict[str, Any]]: + process_func: Callable[[str], List[RecognizerResult]], +) -> List[RecognizerResult]: """Process text chunks and adjust entity offsets. :param chunks: List of TextChunk objects with text and position information - :param process_func: Function that takes chunk text and returns predictions - :return: List of predictions with adjusted offsets + :param process_func: Function that takes chunk text and returns RecognizerResult objects + :return: List of RecognizerResult with adjusted offsets """ all_predictions = [] @@ -47,44 +48,43 @@ def process_text_in_chunks( # Adjust offsets to match original text position for pred in chunk_predictions: - pred["start"] += chunk.start - pred["end"] += chunk.start + pred.start += chunk.start + pred.end += chunk.start all_predictions.extend(chunk_predictions) return all_predictions def deduplicate_overlapping_entities( - predictions: List[Dict[str, Any]], overlap_threshold: float = 0.5 -) -> List[Dict[str, Any]]: + predictions: List[RecognizerResult], overlap_threshold: float = 0.5 +) -> List[RecognizerResult]: """Remove duplicate entities from overlapping chunks. - :param predictions: List of predictions with 'start', 'end', 'label', - 'score' + :param predictions: List of RecognizerResult objects :param overlap_threshold: Overlap ratio threshold to consider duplicates (default: 0.5) - :return: Deduplicated list of predictions sorted by position + :return: Deduplicated list of RecognizerResult sorted by position """ if not predictions: return predictions # Sort by score descending to keep highest scoring entities - sorted_preds = sorted(predictions, key=lambda p: p["score"], reverse=True) + sorted_preds = sorted(predictions, key=lambda p: p.score, reverse=True) unique = [] for pred in sorted_preds: is_duplicate = False for kept in unique: # Check if same entity type and overlapping positions - if pred["label"] == kept["label"]: - overlap_start = max(pred["start"], kept["start"]) - overlap_end = min(pred["end"], kept["end"]) + if pred.entity_type == kept.entity_type: + overlap_start = max(pred.start, kept.start) + overlap_end = min(pred.end, kept.end) if overlap_start < overlap_end: # Calculate overlap ratio overlap_len = overlap_end - overlap_start - pred_len = pred["end"] - pred["start"] - kept_len = kept["end"] - kept["start"] + pred_len = pred.end - pred.start + kept_len = kept.end - kept.start # Check if overlap exceeds threshold if overlap_len / min(pred_len, kept_len) > overlap_threshold: @@ -95,4 +95,4 @@ def deduplicate_overlapping_entities( unique.append(pred) # Sort by position for consistent output - return sorted(unique, key=lambda p: p["start"]) + return sorted(unique, key=lambda p: p.start) diff --git a/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py b/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py index cbe027154..7ed7bd3d0 100644 --- a/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py +++ b/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py @@ -141,14 +141,43 @@ def analyze( labels = self.__create_input_labels(entities) # Process text with automatic chunking - def predict_func(text: str) -> List[Dict[str, Any]]: - return self.gliner.predict_entities( + def predict_func(text: str) -> List[RecognizerResult]: + # Get predictions from GLiNER (returns dicts) + gliner_predictions = self.gliner.predict_entities( text=text, labels=labels, flat_ner=self.flat_ner, threshold=self.threshold, multi_label=self.multi_label, ) + + # Convert dicts to RecognizerResult objects + results = [] + for pred in gliner_predictions: + presidio_entity = self.model_to_presidio_entity_mapping.get( + pred["label"], pred["label"] + ) + + # Filter by requested entities + if entities and presidio_entity not in entities: + continue + + analysis_explanation = AnalysisExplanation( + recognizer=self.name, + original_score=pred["score"], + textual_explanation=f"Identified as {presidio_entity} by GLiNER", + ) + + results.append( + RecognizerResult( + entity_type=presidio_entity, + start=pred["start"], + end=pred["end"], + score=pred["score"], + analysis_explanation=analysis_explanation, + ) + ) + return results predictions = predict_with_chunking( text=text, @@ -156,31 +185,7 @@ def predict_func(text: str) -> List[Dict[str, Any]]: chunker=self.text_chunker, ) - recognizer_results = [] - for prediction in predictions: - presidio_entity = self.model_to_presidio_entity_mapping.get( - prediction["label"], prediction["label"] - ) - if entities and presidio_entity not in entities: - continue - - analysis_explanation = AnalysisExplanation( - recognizer=self.name, - original_score=prediction["score"], - textual_explanation=f"Identified as {presidio_entity} by GLiNER", - ) - - recognizer_results.append( - RecognizerResult( - entity_type=presidio_entity, - start=prediction["start"], - end=prediction["end"], - score=prediction["score"], - analysis_explanation=analysis_explanation, - ) - ) - - return recognizer_results + return predictions def __create_input_labels(self, entities): """Append the entities requested by the user to the list of labels if it's not there.""" # noqa: E501 diff --git a/presidio-analyzer/tests/test_chunking_utils.py b/presidio-analyzer/tests/test_chunking_utils.py index bb19114e6..b182c3fe1 100644 --- a/presidio-analyzer/tests/test_chunking_utils.py +++ b/presidio-analyzer/tests/test_chunking_utils.py @@ -1,6 +1,7 @@ """Tests for chunking utility functions.""" import pytest +from presidio_analyzer import RecognizerResult from presidio_analyzer.chunkers import ( CharacterBasedTextChunker, process_text_in_chunks, @@ -16,13 +17,15 @@ def test_short_text_no_chunking(self): chunker = CharacterBasedTextChunker(chunk_size=100, chunk_overlap=20) text = "Short text" chunks = chunker.chunk(text) - predict_func = lambda chunk: [{"start": 0, "end": 5, "label": "PERSON", "score": 0.9}] + predict_func = lambda chunk: [RecognizerResult( + entity_type="PERSON", start=0, end=5, score=0.9 + )] result = process_text_in_chunks(chunks, predict_func) assert len(result) == 1 - assert result[0]["start"] == 0 - assert result[0]["end"] == 5 + assert result[0].start == 0 + assert result[0].end == 5 def test_long_text_with_offset_adjustment(self): """Test offset adjustment for chunked text.""" @@ -33,19 +36,23 @@ def test_long_text_with_offset_adjustment(self): # Mock predict function that finds entities in each chunk def predict_func(chunk): if "John" in chunk: - return [{"start": 0, "end": 10, "label": "PERSON", "score": 0.9}] + return [RecognizerResult( + entity_type="PERSON", start=0, end=10, score=0.9 + )] elif "Jane" in chunk: idx = chunk.index("Jane") - return [{"start": idx, "end": idx + 8, "label": "PERSON", "score": 0.85}] + return [RecognizerResult( + entity_type="PERSON", start=idx, end=idx + 8, score=0.85 + )] return [] result = process_text_in_chunks(chunks, predict_func) # First entity should be at original position - assert result[0]["start"] == 0 - assert result[0]["end"] == 10 + assert result[0].start == 0 + assert result[0].end == 10 # Second entity should have adjusted offset - assert result[1]["start"] > 20 # In second chunk + assert result[1].start > 20 # In second chunk def test_empty_predictions(self): """Test handling of no predictions.""" @@ -65,46 +72,46 @@ class TestDeduplicateOverlappingEntities: def test_no_duplicates(self): """Test predictions with no overlap.""" predictions = [ - {"start": 0, "end": 10, "label": "PERSON", "score": 0.9}, - {"start": 20, "end": 30, "label": "PERSON", "score": 0.85}, + RecognizerResult(entity_type="PERSON", start=0, end=10, score=0.9), + RecognizerResult(entity_type="PERSON", start=20, end=30, score=0.85), ] result = deduplicate_overlapping_entities(predictions) assert len(result) == 2 - assert result[0]["start"] == 0 - assert result[1]["start"] == 20 + assert result[0].start == 0 + assert result[1].start == 20 def test_exact_duplicates_keeps_highest_score(self): """Test exact duplicates keeps highest scoring entity.""" predictions = [ - {"start": 0, "end": 10, "label": "PERSON", "score": 0.9}, - {"start": 0, "end": 10, "label": "PERSON", "score": 0.85}, + RecognizerResult(entity_type="PERSON", start=0, end=10, score=0.9), + RecognizerResult(entity_type="PERSON", start=0, end=10, score=0.85), ] result = deduplicate_overlapping_entities(predictions) assert len(result) == 1 - assert result[0]["score"] == 0.9 + assert result[0].score == 0.9 def test_overlapping_duplicates(self): """Test overlapping entities are deduplicated.""" predictions = [ - {"start": 0, "end": 10, "label": "PERSON", "score": 0.9}, - {"start": 3, "end": 13, "label": "PERSON", "score": 0.85}, + RecognizerResult(entity_type="PERSON", start=0, end=10, score=0.9), + RecognizerResult(entity_type="PERSON", start=3, end=13, score=0.85), ] result = deduplicate_overlapping_entities(predictions) # Overlap is 7 chars, ratio = 0.7 > 0.5 threshold assert len(result) == 1 - assert result[0]["score"] == 0.9 + assert result[0].score == 0.9 def test_different_labels_not_deduplicated(self): """Test overlapping entities with different labels are kept.""" predictions = [ - {"start": 0, "end": 10, "label": "PERSON", "score": 0.9}, - {"start": 5, "end": 15, "label": "LOCATION", "score": 0.85}, + RecognizerResult(entity_type="PERSON", start=0, end=10, score=0.9), + RecognizerResult(entity_type="LOCATION", start=5, end=15, score=0.85), ] result = deduplicate_overlapping_entities(predictions) @@ -114,8 +121,8 @@ def test_different_labels_not_deduplicated(self): def test_low_overlap_not_deduplicated(self): """Test entities with low overlap are not deduplicated.""" predictions = [ - {"start": 0, "end": 10, "label": "PERSON", "score": 0.9}, - {"start": 9, "end": 20, "label": "PERSON", "score": 0.85}, + RecognizerResult(entity_type="PERSON", start=0, end=10, score=0.9), + RecognizerResult(entity_type="PERSON", start=9, end=20, score=0.85), ] result = deduplicate_overlapping_entities(predictions, overlap_threshold=0.6) @@ -131,22 +138,22 @@ def test_empty_predictions(self): def test_sorted_by_position(self): """Test results are sorted by start position.""" predictions = [ - {"start": 20, "end": 30, "label": "PERSON", "score": 0.9}, - {"start": 0, "end": 10, "label": "PERSON", "score": 0.85}, - {"start": 40, "end": 50, "label": "PERSON", "score": 0.95}, + RecognizerResult(entity_type="PERSON", start=20, end=30, score=0.9), + RecognizerResult(entity_type="PERSON", start=0, end=10, score=0.85), + RecognizerResult(entity_type="PERSON", start=40, end=50, score=0.95), ] result = deduplicate_overlapping_entities(predictions) - assert result[0]["start"] == 0 - assert result[1]["start"] == 20 - assert result[2]["start"] == 40 + assert result[0].start == 0 + assert result[1].start == 20 + assert result[2].start == 40 def test_custom_overlap_threshold(self): """Test custom overlap threshold.""" predictions = [ - {"start": 0, "end": 10, "label": "PERSON", "score": 0.9}, - {"start": 5, "end": 15, "label": "PERSON", "score": 0.85}, + RecognizerResult(entity_type="PERSON", start=0, end=10, score=0.9), + RecognizerResult(entity_type="PERSON", start=5, end=15, score=0.85), ] # With 0.3 threshold, should deduplicate (overlap ratio = 0.5) From 86f16c102259ab1b67561deac4e4d911c85d1394 Mon Sep 17 00:00:00 2001 From: "AJ (Ashitosh Jedhe)" Date: Thu, 8 Jan 2026 10:59:54 +0000 Subject: [PATCH 14/14] Update so that utils methods are part of base chunker --- .../presidio_analyzer/chunkers/__init__.py | 27 ++- .../chunkers/base_chunker.py | 98 ++++++++++- .../chunkers/character_based_text_chunker.py | 28 ++- .../chunkers/chunking_utils.py | 98 ----------- .../ner/gliner_recognizer.py | 4 +- presidio-analyzer/tests/test_base_chunker.py | 94 ++++++++++ .../test_character_based_text_chunker.py | 21 +-- .../tests/test_chunking_utils.py | 165 ------------------ 8 files changed, 244 insertions(+), 291 deletions(-) delete mode 100644 presidio-analyzer/presidio_analyzer/chunkers/chunking_utils.py create mode 100644 presidio-analyzer/tests/test_base_chunker.py delete mode 100644 presidio-analyzer/tests/test_chunking_utils.py diff --git a/presidio-analyzer/presidio_analyzer/chunkers/__init__.py b/presidio-analyzer/presidio_analyzer/chunkers/__init__.py index 9b695c4c7..26e0723ca 100644 --- a/presidio-analyzer/presidio_analyzer/chunkers/__init__.py +++ b/presidio-analyzer/presidio_analyzer/chunkers/__init__.py @@ -4,17 +4,28 @@ from presidio_analyzer.chunkers.character_based_text_chunker import ( CharacterBasedTextChunker, ) -from presidio_analyzer.chunkers.chunking_utils import ( - deduplicate_overlapping_entities, - predict_with_chunking, - process_text_in_chunks, -) + +_CHUNKER_REGISTRY = { + "character": CharacterBasedTextChunker, +} + + +def create_chunker(kind: str, **kwargs) -> BaseTextChunker: + """Factory helper for chunker selection by name. + + Kept minimal to avoid over-abstraction while letting configs select a chunker. + """ + + try: + cls = _CHUNKER_REGISTRY[kind] + except KeyError as exc: # pragma: no cover - defensive for config typos + raise ValueError(f"Unsupported chunker kind: {kind}") from exc + return cls(**kwargs) + __all__ = [ "BaseTextChunker", "TextChunk", "CharacterBasedTextChunker", - "predict_with_chunking", - "process_text_in_chunks", - "deduplicate_overlapping_entities", + "create_chunker", ] diff --git a/presidio-analyzer/presidio_analyzer/chunkers/base_chunker.py b/presidio-analyzer/presidio_analyzer/chunkers/base_chunker.py index 09a8d0222..b29de69c4 100644 --- a/presidio-analyzer/presidio_analyzer/chunkers/base_chunker.py +++ b/presidio-analyzer/presidio_analyzer/chunkers/base_chunker.py @@ -1,7 +1,10 @@ """Abstract base class for text chunking strategies.""" from abc import ABC, abstractmethod from dataclasses import dataclass -from typing import List +from typing import Callable, List, TYPE_CHECKING + +if TYPE_CHECKING: + from presidio_analyzer import RecognizerResult @dataclass @@ -22,6 +25,9 @@ class BaseTextChunker(ABC): Subclasses must implement the chunk() method to split text into TextChunk objects that include both content and position information. + + Provides methods for processing predictions across chunks and + deduplicating overlapping entities. """ @abstractmethod @@ -32,3 +38,93 @@ def chunk(self, text: str) -> List[TextChunk]: :return: List of TextChunk objects with text and position data """ pass + + def predict_with_chunking( + self, + text: str, + predict_func: Callable[[str], List["RecognizerResult"]], + ) -> List["RecognizerResult"]: + """Process text with automatic chunking for long texts. + + For short text, calls predict_func directly. + For long text, chunks it and merges predictions with deduplication. + + :param text: Input text to process + :param predict_func: Function that takes text and returns RecognizerResult objects + :return: List of RecognizerResult with correct offsets + """ + chunks = self.chunk(text) + if len(chunks) <= 1: + return predict_func(text) + + predictions = self._process_chunks(chunks, predict_func) + return self.deduplicate_overlapping_entities(predictions) + + def _process_chunks( + self, + chunks: List[TextChunk], + process_func: Callable[[str], List["RecognizerResult"]], + ) -> List["RecognizerResult"]: + """Process text chunks and adjust entity offsets. + + :param chunks: List of TextChunk objects with text and position information + :param process_func: Function that takes chunk text and returns RecognizerResult objects + :return: List of RecognizerResult with adjusted offsets + """ + all_predictions = [] + + for chunk in chunks: + chunk_predictions = process_func(chunk.text) + + # Adjust offsets to match original text position + for pred in chunk_predictions: + pred.start += chunk.start + pred.end += chunk.start + + all_predictions.extend(chunk_predictions) + + return all_predictions + + def deduplicate_overlapping_entities( + self, + predictions: List["RecognizerResult"], + overlap_threshold: float = 0.5, + ) -> List["RecognizerResult"]: + """Remove duplicate entities from overlapping chunks. + + :param predictions: List of RecognizerResult objects + :param overlap_threshold: Overlap ratio threshold to consider duplicates + (default: 0.5) + :return: Deduplicated list of RecognizerResult sorted by position + """ + if not predictions: + return predictions + + # Sort by score descending to keep highest scoring entities + sorted_preds = sorted(predictions, key=lambda p: p.score, reverse=True) + unique = [] + + for pred in sorted_preds: + is_duplicate = False + for kept in unique: + # Check if same entity type and overlapping positions + if pred.entity_type == kept.entity_type: + overlap_start = max(pred.start, kept.start) + overlap_end = min(pred.end, kept.end) + + if overlap_start < overlap_end: + # Calculate overlap ratio + overlap_len = overlap_end - overlap_start + pred_len = pred.end - pred.start + kept_len = kept.end - kept.start + + # Check if overlap exceeds threshold + if overlap_len / min(pred_len, kept_len) > overlap_threshold: + is_duplicate = True + break + + if not is_duplicate: + unique.append(pred) + + # Sort by position for consistent output + return sorted(unique, key=lambda p: p.start) diff --git a/presidio-analyzer/presidio_analyzer/chunkers/character_based_text_chunker.py b/presidio-analyzer/presidio_analyzer/chunkers/character_based_text_chunker.py index be8a12c80..22a449ac9 100644 --- a/presidio-analyzer/presidio_analyzer/chunkers/character_based_text_chunker.py +++ b/presidio-analyzer/presidio_analyzer/chunkers/character_based_text_chunker.py @@ -4,17 +4,25 @@ https://github.com/theirstory/gliner-spacy/blob/main/gliner_spacy/pipeline.py#L60-L96 """ import logging -from typing import List +from typing import Iterable, List, Tuple from presidio_analyzer.chunkers.base_chunker import BaseTextChunker, TextChunk logger = logging.getLogger("presidio-analyzer") +WORD_BOUNDARY_CHARS: Tuple[str, ...] = (" ", "\n") + + class CharacterBasedTextChunker(BaseTextChunker): """Character-based text chunker with word boundary preservation.""" - def __init__(self, chunk_size: int, chunk_overlap: int = 0): + def __init__( + self, + chunk_size: int = 250, + chunk_overlap: int = 50, + boundary_chars: Iterable[str] | None = None, + ): """Initialize the character-based text chunker. Note: Chunks may slightly exceed chunk_size to preserve complete words. @@ -23,6 +31,8 @@ def __init__(self, chunk_size: int, chunk_overlap: int = 0): :param chunk_size: Target maximum characters per chunk (must be > 0) :param chunk_overlap: Target characters to overlap between chunks (must be >= 0 and < chunk_size) + :param boundary_chars: Characters that count as word boundaries. + Defaults to space/newline to keep current behavior. """ if chunk_size <= 0: logger.error("Invalid chunk_size: %d. Must be greater than 0.", chunk_size) @@ -37,6 +47,10 @@ def __init__(self, chunk_size: int, chunk_overlap: int = 0): self._chunk_size = chunk_size self._chunk_overlap = chunk_overlap + # Allow callers to tune boundaries (e.g., punctuation, tabs) without changing defaults. + self._boundary_chars: Tuple[str, ...] = ( + tuple(boundary_chars) if boundary_chars is not None else WORD_BOUNDARY_CHARS + ) @property def chunk_size(self) -> int: @@ -54,6 +68,12 @@ def chunk_overlap(self) -> int: """ return self._chunk_overlap + @property + def boundary_chars(self) -> Tuple[str, ...]: + """Characters treated as word boundaries when extending chunks.""" + + return self._boundary_chars + def chunk(self, text: str) -> List[TextChunk]: """Split text into overlapping chunks at word boundaries. @@ -87,8 +107,8 @@ def chunk(self, text: str) -> List[TextChunk]: else len(text) ) - # Extend to complete word boundary (space or newline) - while end < len(text) and text[end] not in [" ", "\n"]: + # Extend to complete word boundary (space or newline by default) + while end < len(text) and text[end] not in self._boundary_chars: end += 1 chunks.append(TextChunk(text=text[start:end], start=start, end=end)) diff --git a/presidio-analyzer/presidio_analyzer/chunkers/chunking_utils.py b/presidio-analyzer/presidio_analyzer/chunkers/chunking_utils.py deleted file mode 100644 index 9faff2e95..000000000 --- a/presidio-analyzer/presidio_analyzer/chunkers/chunking_utils.py +++ /dev/null @@ -1,98 +0,0 @@ -"""Utility functions for processing text with chunking strategies.""" -from typing import Callable, List - -from presidio_analyzer import RecognizerResult -from presidio_analyzer.chunkers.base_chunker import BaseTextChunker, TextChunk - - -def predict_with_chunking( - text: str, - predict_func: Callable[[str], List[RecognizerResult]], - chunker: BaseTextChunker, -) -> List[RecognizerResult]: - """Process text with automatic chunking for long texts. - - For short text, calls predict_func directly. - For long text, chunks it and merges predictions with deduplication. - - :param text: Input text to process - :param predict_func: Function that takes text and returns RecognizerResult objects - :param chunker: Text chunking strategy - :return: List of RecognizerResult with correct offsets - """ - # Try direct prediction first for potentially short texts - chunks = chunker.chunk(text) - if len(chunks) <= 1: - return predict_func(text) - - predictions = process_text_in_chunks( - chunks=chunks, - process_func=predict_func, - ) - return deduplicate_overlapping_entities(predictions) - -def process_text_in_chunks( - chunks: List[TextChunk], - process_func: Callable[[str], List[RecognizerResult]], -) -> List[RecognizerResult]: - """Process text chunks and adjust entity offsets. - - :param chunks: List of TextChunk objects with text and position information - :param process_func: Function that takes chunk text and returns RecognizerResult objects - :return: List of RecognizerResult with adjusted offsets - """ - all_predictions = [] - - for chunk in chunks: - chunk_predictions = process_func(chunk.text) - - # Adjust offsets to match original text position - for pred in chunk_predictions: - pred.start += chunk.start - pred.end += chunk.start - - all_predictions.extend(chunk_predictions) - - return all_predictions - -def deduplicate_overlapping_entities( - predictions: List[RecognizerResult], overlap_threshold: float = 0.5 -) -> List[RecognizerResult]: - """Remove duplicate entities from overlapping chunks. - - :param predictions: List of RecognizerResult objects - :param overlap_threshold: Overlap ratio threshold to consider duplicates - (default: 0.5) - :return: Deduplicated list of RecognizerResult sorted by position - """ - if not predictions: - return predictions - - # Sort by score descending to keep highest scoring entities - sorted_preds = sorted(predictions, key=lambda p: p.score, reverse=True) - unique = [] - - for pred in sorted_preds: - is_duplicate = False - for kept in unique: - # Check if same entity type and overlapping positions - if pred.entity_type == kept.entity_type: - overlap_start = max(pred.start, kept.start) - overlap_end = min(pred.end, kept.end) - - if overlap_start < overlap_end: - # Calculate overlap ratio - overlap_len = overlap_end - overlap_start - pred_len = pred.end - pred.start - kept_len = kept.end - kept.start - - # Check if overlap exceeds threshold - if overlap_len / min(pred_len, kept_len) > overlap_threshold: - is_duplicate = True - break - - if not is_duplicate: - unique.append(pred) - - # Sort by position for consistent output - return sorted(unique, key=lambda p: p.start) diff --git a/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py b/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py index 7ed7bd3d0..28b340d63 100644 --- a/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py +++ b/presidio-analyzer/presidio_analyzer/predefined_recognizers/ner/gliner_recognizer.py @@ -10,7 +10,6 @@ from presidio_analyzer.chunkers import ( BaseTextChunker, CharacterBasedTextChunker, - predict_with_chunking, ) from presidio_analyzer.nlp_engine import NerModelConfiguration, NlpArtifacts @@ -179,10 +178,9 @@ def predict_func(text: str) -> List[RecognizerResult]: ) return results - predictions = predict_with_chunking( + predictions = self.text_chunker.predict_with_chunking( text=text, predict_func=predict_func, - chunker=self.text_chunker, ) return predictions diff --git a/presidio-analyzer/tests/test_base_chunker.py b/presidio-analyzer/tests/test_base_chunker.py new file mode 100644 index 000000000..745daf4f8 --- /dev/null +++ b/presidio-analyzer/tests/test_base_chunker.py @@ -0,0 +1,94 @@ +"""Tests for BaseTextChunker methods.""" +import pytest + +from presidio_analyzer import RecognizerResult +from presidio_analyzer.chunkers import CharacterBasedTextChunker + + +class TestPredictWithChunking: + """Test predict_with_chunking orchestration.""" + + def test_short_text_not_chunked(self): + """Short text bypasses chunking.""" + chunker = CharacterBasedTextChunker(chunk_size=100, chunk_overlap=20) + predict_func = lambda t: [ + RecognizerResult(entity_type="PERSON", start=0, end=5, score=0.9) + ] + + result = chunker.predict_with_chunking("Short text", predict_func) + + assert len(result) == 1 + assert result[0].start == 0 + + def test_long_text_offsets_adjusted(self): + """Entity offsets are adjusted to original text positions.""" + chunker = CharacterBasedTextChunker(chunk_size=20, chunk_overlap=5) + text = "John Smith lives in New York City with Jane Doe" + + def predict_func(chunk): + if "Jane" in chunk: + idx = chunk.index("Jane") + return [ + RecognizerResult(entity_type="PERSON", start=idx, end=idx + 4, score=0.9) + ] + return [] + + result = chunker.predict_with_chunking(text, predict_func) + + # Jane appears at position 39 in original text + assert len(result) == 1 + assert result[0].start == text.index("Jane") + + +class TestDeduplicateOverlappingEntities: + """Test deduplication of overlapping entities from chunk boundaries.""" + + def test_exact_duplicates_keeps_highest_score(self): + """Same entity from overlapping chunks keeps higher score.""" + chunker = CharacterBasedTextChunker() + predictions = [ + RecognizerResult(entity_type="PERSON", start=0, end=10, score=0.9), + RecognizerResult(entity_type="PERSON", start=0, end=10, score=0.7), + ] + + result = chunker.deduplicate_overlapping_entities(predictions) + + assert len(result) == 1 + assert result[0].score == 0.9 + + def test_overlapping_same_type_deduplicated(self): + """Overlapping entities of same type are deduplicated.""" + chunker = CharacterBasedTextChunker() + predictions = [ + RecognizerResult(entity_type="PERSON", start=0, end=10, score=0.9), + RecognizerResult(entity_type="PERSON", start=3, end=13, score=0.8), + ] + + result = chunker.deduplicate_overlapping_entities(predictions) + + assert len(result) == 1 + + def test_different_types_not_deduplicated(self): + """Overlapping entities of different types are kept.""" + chunker = CharacterBasedTextChunker() + predictions = [ + RecognizerResult(entity_type="PERSON", start=0, end=10, score=0.9), + RecognizerResult(entity_type="LOCATION", start=5, end=15, score=0.8), + ] + + result = chunker.deduplicate_overlapping_entities(predictions) + + assert len(result) == 2 + + def test_results_sorted_by_position(self): + """Results are sorted by start position.""" + chunker = CharacterBasedTextChunker() + predictions = [ + RecognizerResult(entity_type="PERSON", start=20, end=30, score=0.9), + RecognizerResult(entity_type="PERSON", start=0, end=10, score=0.8), + ] + + result = chunker.deduplicate_overlapping_entities(predictions) + + assert result[0].start == 0 + assert result[1].start == 20 diff --git a/presidio-analyzer/tests/test_character_based_text_chunker.py b/presidio-analyzer/tests/test_character_based_text_chunker.py index 04cca2367..af95c14a5 100644 --- a/presidio-analyzer/tests/test_character_based_text_chunker.py +++ b/presidio-analyzer/tests/test_character_based_text_chunker.py @@ -144,19 +144,16 @@ def test_newline_handling(self): assert "".join([c.text for c in result]) == text def test_default_parameters(self): - """Test chunker with default overlap (0).""" - chunker = CharacterBasedTextChunker(chunk_size=5) # No overlap specified (default=0) - text = "1 2 3 4" # 7 chars + """Test chunker with default parameters (250 chunk_size, 50 overlap).""" + chunker = CharacterBasedTextChunker() # Uses defaults: chunk_size=250, chunk_overlap=50 + assert chunker.chunk_size == 250 + assert chunker.chunk_overlap == 50 + + # Short text should produce single chunk + text = "Short text for testing" result = chunker.chunk(text) - # Chunk 1: "1 2 3" (5 chars, extends to word boundary at position 4) - # Chunk 2: starts at position 5: " 4" (remaining) - assert len(result) == 2 - assert result[0].text == "1 2 3" - assert result[0].start == 0 - assert result[0].end == 5 - assert result[1].text == " 4" - assert result[1].start == 5 - assert result[1].end == 7 + assert len(result) == 1 + assert result[0].text == text def test_very_long_text(self): """Test chunking very long text.""" diff --git a/presidio-analyzer/tests/test_chunking_utils.py b/presidio-analyzer/tests/test_chunking_utils.py deleted file mode 100644 index b182c3fe1..000000000 --- a/presidio-analyzer/tests/test_chunking_utils.py +++ /dev/null @@ -1,165 +0,0 @@ -"""Tests for chunking utility functions.""" -import pytest - -from presidio_analyzer import RecognizerResult -from presidio_analyzer.chunkers import ( - CharacterBasedTextChunker, - process_text_in_chunks, - deduplicate_overlapping_entities, -) - - -class TestProcessTextInChunks: - """Test process_text_in_chunks utility function.""" - - def test_short_text_no_chunking(self): - """Test text shorter than chunk size is not chunked.""" - chunker = CharacterBasedTextChunker(chunk_size=100, chunk_overlap=20) - text = "Short text" - chunks = chunker.chunk(text) - predict_func = lambda chunk: [RecognizerResult( - entity_type="PERSON", start=0, end=5, score=0.9 - )] - - result = process_text_in_chunks(chunks, predict_func) - - assert len(result) == 1 - assert result[0].start == 0 - assert result[0].end == 5 - - def test_long_text_with_offset_adjustment(self): - """Test offset adjustment for chunked text.""" - chunker = CharacterBasedTextChunker(chunk_size=20, chunk_overlap=5) - text = "John Smith lives in New York City with Jane Doe" - chunks = chunker.chunk(text) - - # Mock predict function that finds entities in each chunk - def predict_func(chunk): - if "John" in chunk: - return [RecognizerResult( - entity_type="PERSON", start=0, end=10, score=0.9 - )] - elif "Jane" in chunk: - idx = chunk.index("Jane") - return [RecognizerResult( - entity_type="PERSON", start=idx, end=idx + 8, score=0.85 - )] - return [] - - result = process_text_in_chunks(chunks, predict_func) - - # First entity should be at original position - assert result[0].start == 0 - assert result[0].end == 10 - # Second entity should have adjusted offset - assert result[1].start > 20 # In second chunk - - def test_empty_predictions(self): - """Test handling of no predictions.""" - chunker = CharacterBasedTextChunker(chunk_size=50, chunk_overlap=10) - text = "Some text without entities" - chunks = chunker.chunk(text) - predict_func = lambda chunk: [] - - result = process_text_in_chunks(chunks, predict_func) - - assert result == [] - - -class TestDeduplicateOverlappingEntities: - """Test deduplicate_overlapping_entities utility function.""" - - def test_no_duplicates(self): - """Test predictions with no overlap.""" - predictions = [ - RecognizerResult(entity_type="PERSON", start=0, end=10, score=0.9), - RecognizerResult(entity_type="PERSON", start=20, end=30, score=0.85), - ] - - result = deduplicate_overlapping_entities(predictions) - - assert len(result) == 2 - assert result[0].start == 0 - assert result[1].start == 20 - - def test_exact_duplicates_keeps_highest_score(self): - """Test exact duplicates keeps highest scoring entity.""" - predictions = [ - RecognizerResult(entity_type="PERSON", start=0, end=10, score=0.9), - RecognizerResult(entity_type="PERSON", start=0, end=10, score=0.85), - ] - - result = deduplicate_overlapping_entities(predictions) - - assert len(result) == 1 - assert result[0].score == 0.9 - - def test_overlapping_duplicates(self): - """Test overlapping entities are deduplicated.""" - predictions = [ - RecognizerResult(entity_type="PERSON", start=0, end=10, score=0.9), - RecognizerResult(entity_type="PERSON", start=3, end=13, score=0.85), - ] - - result = deduplicate_overlapping_entities(predictions) - - # Overlap is 7 chars, ratio = 0.7 > 0.5 threshold - assert len(result) == 1 - assert result[0].score == 0.9 - - def test_different_labels_not_deduplicated(self): - """Test overlapping entities with different labels are kept.""" - predictions = [ - RecognizerResult(entity_type="PERSON", start=0, end=10, score=0.9), - RecognizerResult(entity_type="LOCATION", start=5, end=15, score=0.85), - ] - - result = deduplicate_overlapping_entities(predictions) - - assert len(result) == 2 - - def test_low_overlap_not_deduplicated(self): - """Test entities with low overlap are not deduplicated.""" - predictions = [ - RecognizerResult(entity_type="PERSON", start=0, end=10, score=0.9), - RecognizerResult(entity_type="PERSON", start=9, end=20, score=0.85), - ] - - result = deduplicate_overlapping_entities(predictions, overlap_threshold=0.6) - - # Overlap is only 1 char out of 10, ratio = 0.1, below threshold - assert len(result) == 2 - - def test_empty_predictions(self): - """Test empty predictions list.""" - result = deduplicate_overlapping_entities([]) - assert result == [] - - def test_sorted_by_position(self): - """Test results are sorted by start position.""" - predictions = [ - RecognizerResult(entity_type="PERSON", start=20, end=30, score=0.9), - RecognizerResult(entity_type="PERSON", start=0, end=10, score=0.85), - RecognizerResult(entity_type="PERSON", start=40, end=50, score=0.95), - ] - - result = deduplicate_overlapping_entities(predictions) - - assert result[0].start == 0 - assert result[1].start == 20 - assert result[2].start == 40 - - def test_custom_overlap_threshold(self): - """Test custom overlap threshold.""" - predictions = [ - RecognizerResult(entity_type="PERSON", start=0, end=10, score=0.9), - RecognizerResult(entity_type="PERSON", start=5, end=15, score=0.85), - ] - - # With 0.3 threshold, should deduplicate (overlap ratio = 0.5) - result = deduplicate_overlapping_entities(predictions, overlap_threshold=0.3) - assert len(result) == 1 - - # With 0.7 threshold, should keep both (overlap ratio = 0.5 < 0.7) - result = deduplicate_overlapping_entities(predictions, overlap_threshold=0.7) - assert len(result) == 2