Skip to content

security: replace pickle serialization with JSON+gzip to eliminate CWE-502#106

Open
SudipSinha wants to merge 8 commits into
mainfrom
security/json-gzip-serialization
Open

security: replace pickle serialization with JSON+gzip to eliminate CWE-502#106
SudipSinha wants to merge 8 commits into
mainfrom
security/json-gzip-serialization

Conversation

@SudipSinha
Copy link
Copy Markdown
Member

@SudipSinha SudipSinha commented Apr 1, 2026

Summary

Eliminates arbitrary code execution vulnerability (CWE-502) by replacing pickle serialization with secure JSON+gzip for model and row data storage in MariaDB and PVC backends.

Resolves: RHOAIENG-56132

Motivation

PR #96 secured the transport layer — adding gzip decompression middleware to safely handle compressed HTTP requests from the KServe agent on ingress. This PR secures the storage layer — replacing Python pickle with JSON+gzip for serializing data at rest in both PVC (HDF5) and MariaDB backends.

Pickle deserialization (pickle.loads) executes arbitrary code embedded in the payload, making it a classic CWE-502 vector. Even though the data originates internally, defense-in-depth demands that the serialization format itself be incapable of code execution. JSON satisfies this — json.loads can only produce dicts, lists, strings, numbers, booleans, and None.


Changes

Security Improvements

CWE-502 Remediation - Replace pickle with JSON+gzip:

  • Removed: Pickle serialization for Pydantic models and row data
  • Added: Secure JSON encoding with gzip compression
  • Added: Decompression bomb protection via safe_gzip_decompress() with 128 MiB size limit
  • Added: Fail-fast TypeError in json_encoder for unsupported types (prevents silent data corruption)

New Serialization Module

Created src/service/serialization/ with modular components:

models.py - Pydantic model serialization:

  • serialize_model(obj) - Convert Pydantic models to JSON+gzip bytes
  • deserialize_model(data, target_class) - Reconstruct models from JSON+gzip

rows.py - Row-level data serialization:

  • serialize_rows(lst, max_void_type_length, *, compresslevel=1) - Serialize nested lists to numpy void arrays (compresslevel=1 default for throughput)
  • deserialize_rows(serialized) - Deserialize back to nested lists

encoders.py - Custom JSON handlers:

  • NumPy array/scalar encoding
  • Binary data encoding (base64)
  • Type-safe decoding with validation
  • TypeError on unsupported types (no silent str() fallback)

detection.py - Format detection and safe decompression:

  • is_gzip(data) / is_json(data) - Detect format from magic bytes
  • safe_gzip_decompress(data, max_size) - Bounded decompression to prevent decompression bombs

Exception Handling

exceptions.py - Custom exception hierarchy:

  • StorageError - Base exception for storage-related errors
  • DeserializationError - Raised when deserialization fails with payload context

Storage Backend Updates

maria.py - MariaDB backend:

  • Updated to use serialize_model() / deserialize_model()
  • Enhanced error handling with DeserializationError exceptions
  • Improved logging with logger.exception() for better observability
  • All writes use JSON+gzip

pvc.py - PVC/HDF5 backend:

  • Updated row serialization to use serialize_rows() / deserialize_rows()
  • Updated model serialization to use serialize_model() / deserialize_model()
  • Enhanced error handling with DeserializationError exceptions
  • Distinguishes between "not found" (returns None) and "corrupted data" (raises exception)

Bug Fixes

list_utils.py:

  • Fixed get_list_shape() crash on empty lists (IndexError)
  • Added defensive empty list check

Test Coverage

Added comprehensive test suite:

  • tests/service/serialization/test_models.py - Model serialization (roundtrips, nested models, binary data, error handling)
  • tests/service/serialization/test_rows.py - Row serialization (dynamic void sizing, decompression safety, compression effectiveness)
  • tests/service/serialization/test_encoders.py - JSON encoder/decoder (numpy types, bytes, unsupported type rejection)
  • tests/service/serialization/test_detection.py - Format detection and safe_gzip_decompress (normal, at-limit, over-limit, bomb scenarios)
  • tests/service/utils/test_list_utils.py - List utility tests
  • tests/service/data/test_payload_reconciliation_pvc.py - Integration tests (corrupted payload, invalid format, not found behavior)

Test Plan

  • All tests passing (467 passed, 4 skipped)
  • Ruff check/format clean
  • Pyrefly type checking clean (0 errors)
  • Bandit security scan clean (0 issues)
  • Pre-commit hooks all passing

Security Impact

  • Eliminates CWE-502 (Deserialization of Untrusted Data)
  • Removes arbitrary code execution attack vector
  • Decompression bomb protection via bounded safe_gzip_decompress() (128 MiB limit)
  • Fail-fast on unexpected typesTypeError instead of silent str() conversion
  • Improved error handling with defensive validation and proper exception propagation

Related PRs and Epics

This PR builds on the security remediation program:

Epic: RHOAIENG-55574 - Security Remediation Program


Generated with Claude Code

Summary by Sourcery

Replace unsafe pickle-based storage of models and row data with JSON+gzip serialization across PVC (HDF5) and MariaDB backends, adding format detection, bounded decompression, and clearer error handling for corrupted payloads.

New Features:

  • Introduce a dedicated serialization module providing JSON+gzip model and row serialization, format detection, and custom JSON encoders for numpy and binary data.
  • Add custom storage-layer exceptions to distinguish deserialization failures from missing data in storage backends.

Bug Fixes:

  • Prevent get_list_shape() from crashing on empty lists by handling empty input defensively.

Enhancements:

  • Update PVC/HDF5 and MariaDB storage backends to use the new JSON+gzip serializers for partial payloads and row data, with stricter deserialization behavior and improved logging.
  • Tighten PVC metadata and dataset handling, including more robust file existence checks, logging, and always returning a StorageMetadata object even on partial failures.
  • Refine MariaDB storage initialization and metadata retrieval to reduce ambiguity and improve parameter validation and logging.
  • Clarify and streamline list utility responsibilities by delegating row (de)serialization to the new serialization module and keeping only list-shape and type-detection helpers.

Build:

  • Adjust linting configuration to accommodate new serialization tests.

Tests:

  • Expand and reorganize tests to cover model and row serialization, format detection, JSON encoders/decoders, PVC error paths for corrupted/invalid payloads, and list utilities, including MariaDB-specific skips for HDF5-only scenarios.

Summary by CodeRabbit

Release Notes

  • New Features

    • Implemented secure JSON + gzip-based serialization for persistent data storage
  • Improvements

    • Enhanced error reporting for serialization and deserialization failures
    • Added safeguards against decompression attacks through size validation
    • Modernized type annotations throughout the codebase

@sourcery-ai
Copy link
Copy Markdown
Contributor

sourcery-ai Bot commented Apr 1, 2026

Reviewer's Guide

Replaces insecure pickle-based serialization with a JSON+gzip serialization layer (including format detection and decompression limits), wires it into MariaDB and PVC/HDF5 storage backends for models and rows, introduces storage-specific exceptions, and extends tests (including ModelMesh payload reconciliation) to validate security, error handling, and behavior differences such as distinguishing not-found from corrupted data.

Sequence diagram for PVCStorage.get_partial_payload with JSON+gzip and error handling

sequenceDiagram
    actor Service
    participant PVCStorage
    participant H5PYContext
    participant HDF5File
    participant ModelsSerialization as Serialization

    Service->>PVCStorage: get_partial_payload(payload_id, is_input, is_modelmesh)
    PVCStorage->>PVCStorage: get_lock(dataset_name)
    PVCStorage->>H5PYContext: __enter__(dataset_name, "r")
    H5PYContext->>HDF5File: open()
    HDF5File-->>H5PYContext: file_handle
    H5PYContext-->>PVCStorage: db

    PVCStorage->>HDF5File: check dataset_name in db
    alt dataset_missing
        PVCStorage-->>Service: None
        HDF5File->>H5PYContext: close()
        H5PYContext-->>PVCStorage: __exit__
    else dataset_exists
        PVCStorage->>HDF5File: read attrs[payload_id]
        alt payload_missing
            PVCStorage-->>Service: None
            HDF5File->>H5PYContext: close()
            H5PYContext-->>PVCStorage: __exit__
        else payload_present
            PVCStorage->>PVCStorage: ensure bytes(serialized_data)
            PVCStorage->>Serialization: deserialize_model(serialized_data, target_class)
            alt deserialization_success
                Serialization-->>PVCStorage: payload_model
                PVCStorage-->>Service: payload_model
                HDF5File->>H5PYContext: close()
                H5PYContext-->>PVCStorage: __exit__
            else deserialization_failure
                Serialization-->>PVCStorage: raises Exception
                PVCStorage->>PVCStorage: log exception
                PVCStorage->>Service: raise DeserializationError
            end
        end
    end

    opt unexpected_storage_error
        PVCStorage->>PVCStorage: log exception
        PVCStorage-->>Service: raise StorageError
    end
Loading

Class diagram for new JSON+gzip serialization layer and storage exceptions

classDiagram
    class ModelsSerialization {
        +bytes serialize_model(obj)
        +T deserialize_model(data, target_class)
    }

    class RowsSerialization {
        +np.ndarray serialize_rows(lst, max_void_type_length, compresslevel)
        +np.ndarray deserialize_rows(serialized)
    }

    class Encoders {
        +object json_encoder(obj)
        +object json_decoder_hook(obj)
    }

    class Detection {
        +str detect_format(data)
        +bool is_gzip(data)
        +bool is_json(data)
        +bytes safe_gzip_decompress(data, max_size)
        <<constants>> GZIP_MAGIC
        <<constants>> MAX_DECOMPRESSED_SIZE
    }

    class StorageExceptions {
        +str payload_id
        +str reason
        +Exception original_exception
    }

    class StorageError {
        <<exception>>
    }

    class DeserializationError {
        <<exception>>
    }

    class PVCStorage {
        +persist_partial_payload(payload, payload_id, is_input)
        +get_partial_payload(payload_id, is_input, is_modelmesh)
        +delete_partial_payload(request_id, is_input)
    }

    class MariaDBStorage {
        +persist_partial_payload(payload, payload_id, is_input)
        +get_partial_payload(payload_id, is_input, is_modelmesh)
        +delete_partial_payload(payload_id, is_input)
    }

    ModelsSerialization ..> Encoders : uses
    ModelsSerialization ..> Detection : uses

    RowsSerialization ..> Encoders : uses
    RowsSerialization ..> Detection : uses

    Encoders <.. ModelsSerialization
    Encoders <.. RowsSerialization

    Detection <.. ModelsSerialization
    Detection <.. RowsSerialization

    StorageExceptions <|-- DeserializationError

    PVCStorage ..> ModelsSerialization : serialize_model\ndeserialize_model
    PVCStorage ..> RowsSerialization : serialize_rows\ndeserialize_rows
    PVCStorage ..> StorageExceptions : DeserializationError

    MariaDBStorage ..> ModelsSerialization : serialize_model\ndeserialize_model
    MariaDBStorage ..> StorageExceptions : DeserializationError
Loading

File-Level Changes

Change Details Files
Introduce JSON+gzip serialization layer for models and row data with custom encoders and format detection.
  • Add serialization package exposing serialize_model/deserialize_model and serialize_rows/deserialize_rows as the public API.
  • Implement model serialization using Pydantic model_dump or dicts, JSON encoding with custom numpy/bytes handling, and gzip compression with support for both compressed and plain JSON on read.
  • Implement row serialization that turns nested lists into numpy void arrays of dynamically sized gzip-compressed JSON blobs, enforcing a max per-row size and using safe gzip decompression.
  • Provide format detection helpers (gzip vs JSON) and bounded gzip decompression to mitigate decompression bombs, plus JSON encoder/decoder hooks for numpy and bytes.
src/service/serialization/__init__.py
src/service/serialization/models.py
src/service/serialization/rows.py
src/service/serialization/encoders.py
src/service/serialization/detection.py
Add storage-specific exceptions and use them to distinguish deserialization failures from not-found cases.
  • Define StorageError base class and DeserializationError carrying payload id, reason, and original exception details.
  • Use DeserializationError in storage backends when JSON+gzip deserialization fails (e.g., corrupted data or invalid format) so callers can differentiate data corruption from missing records.
src/service/data/storage/exceptions.py
src/service/data/storage/pvc.py
src/service/data/storage/maria/maria.py
Wire JSON+gzip serialization into PVC/HDF5 backend for both tabular data and partial payloads, and tighten behavior around dataset/filesystem handling.
  • Replace list_utils.serialize_rows/deserialize_rows calls with the new serialization.rows equivalents, and allow write_data to accept numpy arrays or plain lists.
  • Change PVC partial payload persistence to store gzip-compressed JSON via serialize_model instead of pickled dicts in HDF5 attributes.
  • Update get_partial_payload to detect missing datasets vs present-but-corrupted data, performing JSON+gzip deserialization, converting HDF5 attribute values to bytes, and raising DeserializationError on decode/validation failures while still returning None for not-found cases.
  • Adjust delete_partial_payload naming and logic to delete by request_id, avoid creating phantom HDF5 files, and clean up empty datasets; simplify logging and path handling using pathlib.
  • Refine metadata retrieval and dataset listing helpers (locking map construction, logging, schema construction, and default StorageMetadata in error cases) without changing external contracts.
src/service/data/storage/pvc.py
src/service/utils/list_utils.py
Wire JSON+gzip serialization into MariaDB backend for partial payloads and improve schema/migration initialization and SQL usage comments.
  • Change partial payload storage to call serialize_model and store compressed JSON in LONGBLOB instead of pickled dicts.
  • Change get_partial_payload to deserialize via deserialize_model into the appropriate Pydantic model class and raise DeserializationError on failure while returning None when rows are absent.
  • Slightly tighten MariaDBStorage constructor and internal constants, keep SQL f-strings but clarify they use internal identifiers in security comments, and clean up docstrings.
  • Keep dataset read/write semantics the same while documenting that inference rows remain pickled (handled elsewhere) and that this PR mainly affects partial payload storage.
src/service/data/storage/maria/maria.py
src/service/data/storage/maria/legacy_maria_reader.py
Refactor list utilities to focus on shape and numeric detection, delegating row serialization to the new module.
  • Simplify get_list_shape to safely handle empty lists (avoid IndexError) and keep contains_non_numeric for nested lists/arrays.
  • Remove the old pickle-based serialize_rows/deserialize_rows from list_utils and rely on serialization.rows instead.
  • Add tests that cover the new empty-list behavior and the re-export of row serialization helpers from the serialization package.
src/service/utils/list_utils.py
tests/service/utils/test_list_utils.py
Extend ModelMesh payload reconciliation tests (PVC and Maria) to validate new serialization behavior, size limits, and error semantics.
  • Refactor PVC payload reconciliation tests to use TYPE_CHECKING-based Coroutine imports, rename constants for clarity, and simplify mypy type narrowing now that storage returns specific model types.
  • Add tests that directly inject corrupted/invalid-format bytes into HDF5 attributes and assert that PVCStorage.get_partial_payload raises DeserializationError, while missing payloads still return None.
  • Add tests that verify MAX_VOID_TYPE_LENGTH enforcement with non-compressible random strings under JSON+gzip, both exceeding and within limits, and that numeric/column expectations remain unchanged.
  • For MariaDB payload reconciliation tests, explicitly skip the PVC-only deserialization tests since on-disk corruption injection is not applicable to MariaDB.
tests/service/data/test_payload_reconciliation_pvc.py
tests/service/data/test_payload_reconciliation_maria.py
Add focused test suites for the new serialization layer and decompression safety.
  • Add model serialization tests covering round-trips, nested Pydantic models, binary fields, gzip vs plain JSON inputs, schema validation errors, random/empty/invalid byte sequences, and compression characteristics.
  • Add row serialization tests that verify dtype sizing, gzip usage, mixed-type handling, max-size enforcement, decompression of padded void arrays, and behavior for corrupted/non-gzip rows.
  • Add encoder/decoder tests that verify numpy arrays/scalars and bytes are encoded to JSON-safe forms, that unsupported types raise TypeError, and that bytes survive round-trips via json.dumps/loads.
  • Add detection tests ensuring gzip/JSON format detection works, that empty/unknown formats raise, and that safe_gzip_decompress enforces the decompressed size ceiling and flags potential gzip bombs.
  • Register minor Ruff rule ignore for the new test_rows module and add init.py markers for the new test packages.
tests/service/serialization/test_models.py
tests/service/serialization/test_rows.py
tests/service/serialization/test_detection.py
tests/service/serialization/test_encoders.py
tests/service/serialization/__init__.py
tests/service/utils/__init__.py
pyproject.toml

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Comment thread src/service/prometheus/prometheus_publisher.py Fixed
@SudipSinha SudipSinha self-assigned this Apr 1, 2026
Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 1 security issue, 3 other issues, and left some high level feedback:

Security issues:

  • Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option. (link)

General comments:

  • The new JSON+gzip serialization for partial payloads in MariaDB/PVC no longer has an explicit pickle fallback (e.g. get_partial_payload now always calls deserialize_model without checking for legacy pickle bytes), which contradicts the stated backward-compatibility; consider adding pickle-format detection and a pkl.loads path for existing records in those tables.
  • The detect_format/is_json logic will treat any byte sequence starting with an ASCII digit or - (or prefixes like true/null) as JSON, which could misclassify arbitrary binary blobs and change error behavior; it may be safer to require stricter JSON detection (e.g. successful json.loads in a small try/except) before declaring data as JSON.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The new JSON+gzip serialization for partial payloads in MariaDB/PVC no longer has an explicit pickle fallback (e.g. `get_partial_payload` now always calls `deserialize_model` without checking for legacy pickle bytes), which contradicts the stated backward-compatibility; consider adding pickle-format detection and a `pkl.loads` path for existing records in those tables.
- The `detect_format`/`is_json` logic will treat any byte sequence starting with an ASCII digit or `-` (or prefixes like `true`/`null`) as JSON, which could misclassify arbitrary binary blobs and change error behavior; it may be safer to require stricter JSON detection (e.g. successful `json.loads` in a small try/except) before declaring data as JSON.

## Individual Comments

### Comment 1
<location path="src/service/data/storage/pvc.py" line_range="583-592" />
<code_context>
             )
             conn.commit()

     async def get_partial_payload(
         self, payload_id: str, is_input: bool, is_modelmesh: bool
-    ) -> Union[PartialPayload, KServeInferenceRequest, KServeInferenceResponse]:
</code_context>
<issue_to_address>
**issue (bug_risk):** Legacy pickle-serialized payloads are no longer readable, despite the docstring claiming automatic migration.

The previous implementation stored `pickle.dumps(payload.model_dump())` in `payload_id` attrs, but the new code only supports gzip-compressed or plain JSON. Legacy pickle data will therefore cause `deserialize_model` to raise `ValueError`, be caught, and return `None`, silently discarding existing payloads and violating the documented “automatic migration from legacy pickle data”.

If backward compatibility is required, add a pickle fallback when JSON deserialization fails, e.g.:

```python
try:
    return deserialize_model(serialized_data, target_class)
except ValueError:
    import pickle
    try:
        payload_dict = pickle.loads(serialized_data)
        model = target_class(**payload_dict)
        # Optionally re-save in the new format
        dataset.attrs[payload_id] = serialize_model(model)
        return model
    except Exception:
        logger.error(
            "Failed to deserialize legacy pickled payload for %s", payload_id,
            exc_info=True,
        )
        return None
```

This preserves legacy data while gradually migrating it to the new JSON+gzip format.
</issue_to_address>

### Comment 2
<location path="src/service/data/storage/maria/maria.py" line_range="515-524" />
<code_context>
             )
             conn.commit()

     async def get_partial_payload(
         self, payload_id: str, is_input: bool, is_modelmesh: bool
-    ) -> Union[PartialPayload, KServeInferenceRequest, KServeInferenceResponse]:
</code_context>
<issue_to_address>
**issue (bug_risk):** MariaDB partial payloads stored as pickle blobs are no longer compatible with the new JSON+gzip deserialization.

Previously, `trustyai_v2_partial_payloads.payload_data` stored `pickle.dumps(payload.model_dump())`. After this change, `get_partial_payload` calls `deserialize_model(result[0], target_class)`, which only supports JSON / gzip JSON, so it will fail on existing pickle data.

To keep previously stored payloads readable, consider a compatibility path similar to HDF5:
1. Call `deserialize_model` first.
2. On `ValueError`, fall back to `pickle.loads(result[0])` and build the model from the dict.
3. Optionally, persist the converted result back via `serialize_model` to migrate on read.

Without this, upgrading will make all legacy MariaDB payloads effectively inaccessible.
</issue_to_address>

### Comment 3
<location path="tests/service/utils/test_list_utils.py" line_range="15" />
<code_context>
+from src.service.utils.list_utils import contains_non_numeric, get_list_shape
+
+
+class TestGetListShape:
+    """Test get_list_shape function."""
+
</code_context>
<issue_to_address>
**issue (testing):** Add a test case for `get_list_shape([])` to cover the new empty-list behavior.

The new `isinstance(lst, list) and lst` guard changes behavior for `[]`, but `TestGetListShape` doesn’t currently assert the expected result for an empty list. Please add a test like:

```python
def test_empty_list(self):
    assert get_list_shape([]) == []
```

This will cover the new behavior and protect against regressions.
</issue_to_address>

### Comment 4
<location path="src/service/data/storage/maria/maria.py" line_range="509-512" />
<code_context>
            cursor.execute(
                f"INSERT INTO `{self.partial_payload_table}` (payload_id, is_input, payload_data) VALUES (?, ?, ?)",
                (payload_id, is_input, serialize_model(payload)),
            )
</code_context>
<issue_to_address>
**security (python.sqlalchemy.security.sqlalchemy-execute-raw-query):** Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

*Source: opengrep*
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread src/service/data/storage/pvc.py
Comment thread src/service/data/storage/maria/maria.py
Comment thread tests/service/utils/test_list_utils.py
Comment thread src/service/data/storage/maria/maria.py
@SudipSinha SudipSinha marked this pull request as draft April 1, 2026 13:03
@SudipSinha SudipSinha force-pushed the security/json-gzip-serialization branch from 536e45f to 8fd5d36 Compare April 1, 2026 13:09
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
tests/service/data/test_payload_reconciliation_pvc.py (1)

223-225: ⚠️ Potential issue | 🟡 Minor

Stale comment references pickle instead of JSON+gzip.

The comment mentions "pickle overhead" but the serialization has been migrated to JSON+gzip.

📝 Suggested fix
         # Create a payload that is just under the limit
-        # Account for pickle overhead by using a smaller string
+        # Account for JSON+gzip overhead by using a smaller string
         allowed_string = "y" * (MAX_VOID_TYPE_LENGTH - 200)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/service/data/test_payload_reconciliation_pvc.py` around lines 223 -
225, The comment above allowed_string currently mentions "pickle overhead" which
is stale; update the comment to reference the current serialization method
(JSON+gzip) and, if necessary, adjust the reserved overhead value (the "- 200"
used with MAX_VOID_TYPE_LENGTH) to reflect JSON+gzip overhead; locate the
allowed_string definition and MAX_VOID_TYPE_LENGTH usage and replace the phrase
"pickle overhead" with "JSON+gzip overhead" (and tweak the numeric buffer if you
have measured different overhead for JSON+gzip).
src/service/data/storage/pvc.py (1)

246-257: ⚠️ Potential issue | 🟠 Major

Existing PVC row data loses its read path.

read_data() now routes byte-backed datasets through deserialize_rows(), but src/service/serialization/rows.py only accepts gzip-compressed JSON and rejects any non-gzip blob. That means pre-upgrade HDF5 rows written with the old pickle-based serializer become unreadable as soon as this lands. Please keep a legacy read/migrate path until those rows are rewritten.

Also applies to: 288-293

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/service/data/storage/pvc.py` around lines 246 - 257, read_data() now
assumes byte blobs are gzip-compressed JSON via deserialize_rows(), which will
break pre-upgrade HDF5 rows serialized with the old pickle-based serializer; add
a legacy detection and migration path: in the read_data() code path that calls
deserialize_rows(), wrap the call in a try/except and if deserialize_rows()
fails, attempt a legacy_deserialize_pickle (or similar) routine to decode the
old blob, then re-serialize the recovered rows using serialize_rows() and write
them back using write_data() or _write_raw_data() so stored rows are upgraded;
also add the same fallback logic where serialize/deserialize decisions are made
in write_data()/the block around serialize_rows() to ensure legacy rows remain
readable and are migrated on first access (referencing functions read_data,
deserialize_rows, serialize_rows, write_data, and _write_raw_data).
src/service/data/storage/maria/maria.py (1)

211-321: ⚠️ Potential issue | 🔴 Critical

Maria-backed row storage still permits pickle deserialization.

Both write_data() (line 304) and read_data() (line 354) call np.save() and np.load() with allow_pickle=True. This leaves the CWE-502 remediation incomplete for Maria-backed datasets: a tampered LONGBLOB can still trigger NumPy's object-array unpickling path, even after partial payloads moved to JSON+gzip. Move these row blobs to the new non-pickle serializer before shipping.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/service/data/storage/maria/maria.py` around lines 211 - 321, write_data
(and the corresponding read_data) currently use np.save/np.load with
allow_pickle=True which allows unsafe pickle deserialization; replace the blob
serialization/deserialization in write_data (the io.BytesIO / np.save block that
produces the LONGBLOBs) and the read_data loading path to use the project's
non-pickling serializer (e.g., the JSON+gzip encoder used elsewhere) or a new
safe pair like serialize_blob(obj) / deserialize_blob(bytes) instead of
np.save/np.load; update references around write_data, read_data, the byte_matrix
creation, and any place that reads from `trustyai_v2_table_reference` /
`dataset_reference_table` / table rows so the DB stores and restores blobs via
the safe serializer rather than pickle.
🧹 Nitpick comments (3)
tests/service/serialization/test_rows.py (1)

139-153: Missing test coverage for rows containing bytes data.

The roundtrip test covers various primitive types but doesn't include bytes data. Given that json_encoder encodes bytes as {"__type__": "bytes", "data": "..."}, a test with bytes would help validate the complete round-trip (and would currently fail due to the missing json_decoder_hook in deserialize_rows).

🧪 Suggested test to add
def test_roundtrip_preserves_bytes(self):
    """Test that roundtrip preserves bytes data."""
    original_rows = [
        [b"\x00\x01\x02", "text", 123],
        [b"\xff\xfe", "more", 456],
    ]

    serialized = serialize_rows(original_rows, max_void_type_length=MAX_VOID_TYPE_LENGTH)
    deserialized = deserialize_rows(serialized)

    for i, row in enumerate(deserialized):
        assert list(row) == original_rows[i]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/service/serialization/test_rows.py` around lines 139 - 153, The test
suite is missing coverage for bytes values which json_encoder serializes as
{"__type__":"bytes","data":"..."}, so add a test (e.g.,
test_roundtrip_preserves_bytes) that includes rows with bytes (like b"\x00\x01"
etc.) and asserts round-trip equality via serialize_rows and deserialize_rows;
then update deserialize_rows to use the json_decoder_hook (or otherwise detect
and decode objects with "__type__":"bytes") when parsing the serialized JSON so
bytes are reconstructed properly (refer to functions serialize_rows,
deserialize_rows, json_encoder, and json_decoder_hook).
tests/service/serialization/test_encoders.py (1)

88-118: Missing test for error path when bytes dict lacks "data" field.

The json_decoder_hook raises ValueError when a dict has __type__: "bytes" but is missing the data field. This error path should be covered.

🧪 Suggested test to add
def test_decode_bytes_missing_data_raises(self):
    """Test that bytes dict missing data field raises ValueError."""
    encoded = {"__type__": "bytes"}  # Missing 'data' field
    
    with pytest.raises(ValueError, match="missing 'data' field"):
        json_decoder_hook(encoded)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/service/serialization/test_encoders.py` around lines 88 - 118, Add a
unit test that covers the error path in json_decoder_hook when a dict has
"__type__": "bytes" but no "data" key: create a test named
test_decode_bytes_missing_data_raises that constructs encoded = {"__type__":
"bytes"} and uses pytest.raises(ValueError, match="missing 'data' field") to
call json_decoder_hook(encoded); this ensures the json_decoder_hook error branch
for missing data is exercised and validated.
src/service/utils/list_utils.py (1)

28-28: Consider using list unpacking for clarity.

The current concatenation works correctly, but list unpacking is more idiomatic in modern Python.

♻️ Suggested refactor
-    return [len(lst)] + get_list_shape(lst[0]) if isinstance(lst, list) and lst else []
+    return [len(lst), *get_list_shape(lst[0])] if isinstance(lst, list) and lst else []
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/service/utils/list_utils.py` at line 28, The get_list_shape function's
return currently concatenates two lists; replace that concatenation with Python
list unpacking to be more idiomatic: build a new list starting with len(lst) and
then unpack the result of get_list_shape(lst[0]) into it (using the * unpacking
operator), keeping the same guard check (isinstance(lst, list) and lst) and the
empty-list fallback.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In @.bandit:
- Around line 16-20: Remove the global "skips:" entries for B608, B104, B108 and
B101 from the .bandit config and instead add scoped inline suppressions where
the code actually requires them (e.g., add "# nosec B608  # table name is
internal constant" on the SQL table-name usages, "# nosec B104  # intentional
bind to 0.0.0.0 for HTTPS endpoint" where the server bind occurs, "# nosec B108 
# fallback /tmp usage overridden by env in production" at the temporary-file
fallback site, and "# nosec B101  # assert used only for development validation"
on the assert statements). Follow the existing pattern used for B324/B404/B603:
place the nosec on the exact line, include a short justification comment, and
remove the corresponding global skip entries so Bandit stays scoped to real
exceptions.

In `@Containerfile`:
- Around line 51-57: The RUN block that downloads and executes
mariadb_repo_setup is insufficiently protected; update it to pin and verify the
script's SHA256 before chmod+x and execution: after curl (in the same
EXTRAS=="mariadb" branch) obtain the expected SHA256 (hard-code the known
fingerprint for this release or download the signed checksum file), compute the
downloaded file's sha256 and compare it against the pinned value, fail the build
if it doesn't match, then proceed to chmod +x and run ./mariadb_repo_setup;
reference the existing symbols EXTRAS and mariadb_repo_setup to locate and
replace the current grep/non-empty checks with this checksum verification step.

In `@src/endpoints/evaluation/lm_evaluation_harness.py`:
- Around line 217-218: The endpoint currently allows a user-controlled
lm_eval_path to be embedded into the CLI string and run by subprocess in
convert_to_cli / where subprocess.Popen is called, which permits arbitrary
binaries; change this by validating and constructing the command explicitly:
reject or ignore any lm_eval_path that is not an allowed token (e.g., only
accept None or a single canonical value like "python -m lm_eval" or a boolean
flag in LMEvalRequest), and always build the args list as ['python', '-m',
'lm_eval', ...] (or another fixed interpreter/module tuple) instead of using the
raw lm_eval_path string and shlex.split; update LMEvalRequest handling (where
lm_eval_path is merged) to validate/normalize input and ensure convert_to_cli no
longer concatenates a user string before calling subprocess.Popen.

In `@src/service/serialization/models.py`:
- Around line 1-6: The docstring promises legacy pickle migration but the
deserialize path only tries gzip JSON and plain JSON; either implement an
explicit pickle-legacy read-and-migrate step or remove the compatibility claim.
To fix, update the module's deserialize function (e.g., deserialize / load_model
/ from_bytes) to, after failing gzip JSON and plain JSON, attempt a safe pickle
load (using pickle.loads) in a try/except, validate that the unpickled object is
a Pydantic model or dict, convert it to the current JSON structure, re-serialize
to the module's canonical gzip+JSON format and return that (and log a migration
warning), or alternatively remove the legacy wording from the module docstring
and ensure callers (src/service/data/storage/maria/maria.py and
src/service/data/storage/pvc.py) perform explicit migration before relying on
this serializer.

In `@src/service/serialization/rows.py`:
- Around line 99-103: The gzip-deserialize branch calling json.loads(json_str)
in this module must use the same decoder hook used by serialization so bytes
markers become real bytes again: change the call to json.loads(json_str,
object_hook=json_decoder_hook) and add an import for json_decoder_hook from the
encoders module (to match json_encoder used in serialize_rows); update the
exception message if needed but ensure json_decoder_hook is referenced in the
decompression path where json.loads is invoked (variables: json_str,
deserialized, row_bytes).

---

Outside diff comments:
In `@src/service/data/storage/maria/maria.py`:
- Around line 211-321: write_data (and the corresponding read_data) currently
use np.save/np.load with allow_pickle=True which allows unsafe pickle
deserialization; replace the blob serialization/deserialization in write_data
(the io.BytesIO / np.save block that produces the LONGBLOBs) and the read_data
loading path to use the project's non-pickling serializer (e.g., the JSON+gzip
encoder used elsewhere) or a new safe pair like serialize_blob(obj) /
deserialize_blob(bytes) instead of np.save/np.load; update references around
write_data, read_data, the byte_matrix creation, and any place that reads from
`trustyai_v2_table_reference` / `dataset_reference_table` / table rows so the DB
stores and restores blobs via the safe serializer rather than pickle.

In `@src/service/data/storage/pvc.py`:
- Around line 246-257: read_data() now assumes byte blobs are gzip-compressed
JSON via deserialize_rows(), which will break pre-upgrade HDF5 rows serialized
with the old pickle-based serializer; add a legacy detection and migration path:
in the read_data() code path that calls deserialize_rows(), wrap the call in a
try/except and if deserialize_rows() fails, attempt a legacy_deserialize_pickle
(or similar) routine to decode the old blob, then re-serialize the recovered
rows using serialize_rows() and write them back using write_data() or
_write_raw_data() so stored rows are upgraded; also add the same fallback logic
where serialize/deserialize decisions are made in write_data()/the block around
serialize_rows() to ensure legacy rows remain readable and are migrated on first
access (referencing functions read_data, deserialize_rows, serialize_rows,
write_data, and _write_raw_data).

In `@tests/service/data/test_payload_reconciliation_pvc.py`:
- Around line 223-225: The comment above allowed_string currently mentions
"pickle overhead" which is stale; update the comment to reference the current
serialization method (JSON+gzip) and, if necessary, adjust the reserved overhead
value (the "- 200" used with MAX_VOID_TYPE_LENGTH) to reflect JSON+gzip
overhead; locate the allowed_string definition and MAX_VOID_TYPE_LENGTH usage
and replace the phrase "pickle overhead" with "JSON+gzip overhead" (and tweak
the numeric buffer if you have measured different overhead for JSON+gzip).

---

Nitpick comments:
In `@src/service/utils/list_utils.py`:
- Line 28: The get_list_shape function's return currently concatenates two
lists; replace that concatenation with Python list unpacking to be more
idiomatic: build a new list starting with len(lst) and then unpack the result of
get_list_shape(lst[0]) into it (using the * unpacking operator), keeping the
same guard check (isinstance(lst, list) and lst) and the empty-list fallback.

In `@tests/service/serialization/test_encoders.py`:
- Around line 88-118: Add a unit test that covers the error path in
json_decoder_hook when a dict has "__type__": "bytes" but no "data" key: create
a test named test_decode_bytes_missing_data_raises that constructs encoded =
{"__type__": "bytes"} and uses pytest.raises(ValueError, match="missing 'data'
field") to call json_decoder_hook(encoded); this ensures the json_decoder_hook
error branch for missing data is exercised and validated.

In `@tests/service/serialization/test_rows.py`:
- Around line 139-153: The test suite is missing coverage for bytes values which
json_encoder serializes as {"__type__":"bytes","data":"..."}, so add a test
(e.g., test_roundtrip_preserves_bytes) that includes rows with bytes (like
b"\x00\x01" etc.) and asserts round-trip equality via serialize_rows and
deserialize_rows; then update deserialize_rows to use the json_decoder_hook (or
otherwise detect and decode objects with "__type__":"bytes") when parsing the
serialized JSON so bytes are reconstructed properly (refer to functions
serialize_rows, deserialize_rows, json_encoder, and json_decoder_hook).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: fa6ecba9-33ef-40cb-95ba-3257f8f4dada

📥 Commits

Reviewing files that changed from the base of the PR and between 44de3a3 and 536e45f.

⛔ Files ignored due to path filters (1)
  • uv.lock is excluded by !**/*.lock
📒 Files selected for processing (25)
  • .bandit
  • .dockerignore
  • .flake8
  • .github/workflows/python-tests.yaml
  • .github/workflows/security-scan.yaml
  • Containerfile
  • Dockerfile
  • pyproject.toml
  • src/endpoints/evaluation/lm_evaluation_harness.py
  • src/service/data/storage/maria/maria.py
  • src/service/data/storage/pvc.py
  • src/service/prometheus/prometheus_publisher.py
  • src/service/serialization/__init__.py
  • src/service/serialization/detection.py
  • src/service/serialization/encoders.py
  • src/service/serialization/models.py
  • src/service/serialization/rows.py
  • src/service/utils/list_utils.py
  • tests/service/data/test_payload_reconciliation_pvc.py
  • tests/service/serialization/__init__.py
  • tests/service/serialization/test_detection.py
  • tests/service/serialization/test_encoders.py
  • tests/service/serialization/test_models.py
  • tests/service/serialization/test_rows.py
  • tests/service/utils/test_list_utils.py
💤 Files with no reviewable changes (3)
  • .dockerignore
  • Dockerfile
  • .flake8

Comment thread .bandit
Comment on lines +16 to +20
skips:
- B608 # SQL injection false positives - table names are internal constants
- B104 # Binding to 0.0.0.0 is intentional for HTTPS endpoint
- B108 # /tmp usage is fallback, overridden by env var in production
- B101 # Assert statements - used for development validation, not security checks
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify current global suppressions and where scoped suppressions already exist.
python - <<'PY'
import yaml, pathlib
cfg = yaml.safe_load(pathlib.Path(".bandit").read_text())
print("Global skips:", cfg.get("skips", []))
PY

# Inspect existing inline suppressions and related hotspots.
rg -n -C2 --type=py 'nosec|B608|B104|B108' src

Repository: trustyai-explainability/trustyai-service

Length of output: 1603


🏁 Script executed:

#!/bin/bash
# Search for SQL-related patterns and queries that would trigger B608
rg -n --type=py 'execute\(|query\(|sql\s*=' src | head -20

echo "---"

# Search for 0.0.0.0 binding patterns
rg -n --type=py '0\.0\.0\.0' src

echo "---"

# Search for /tmp usage
rg -n --type=py '/tmp' src

echo "---"

# Search for assert statements
rg -n --type=py '^\s*assert\s' src | head -20

Repository: trustyai-explainability/trustyai-service

Length of output: 2525


🏁 Script executed:

#!/bin/bash
# Check if these patterns are in test files vs production code
echo "=== B608 SQL queries in src/ (production) ==="
rg -l --type=py 'execute\(|query\(' src | grep -v test

echo "=== B104 0.0.0.0 in src/ (production) ==="
rg -l --type=py '0\.0\.0\.0' src | grep -v test

echo "=== B108 /tmp in src/ (production) ==="
rg -l --type=py '/tmp' src | grep -v test

echo "=== B101 asserts in src/ (production) ==="
rg -l --type=py '^\s*assert\s' src | grep -v test

Repository: trustyai-explainability/trustyai-service

Length of output: 453


🏁 Script executed:

#!/bin/bash
# Verify if /tmp usage has env var override as claimed
rg -n --type=py '/tmp|environ|getenv' src | grep -A2 -B2 '/tmp'

echo "---"

# Check for HTTPS endpoint configuration to understand 0.0.0.0 claim
rg -n --type=py 'https|ssl|certificate|uvicorn|host.*0\.0' src | head -30

Repository: trustyai-explainability/trustyai-service

Length of output: 2690


Replace global Bandit suppressions with scoped inline # nosec annotations.

Global skips disable checks across all scanned source files. Instead, add inline # nosec B608, # nosec B104, # nosec B108, and # nosec B101 with justifications on specific lines. The codebase already uses this pattern for rules like B324, B404, and B603. Scoped suppressions prevent new violations from being masked and maintain security coverage.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In @.bandit around lines 16 - 20, Remove the global "skips:" entries for B608,
B104, B108 and B101 from the .bandit config and instead add scoped inline
suppressions where the code actually requires them (e.g., add "# nosec B608  #
table name is internal constant" on the SQL table-name usages, "# nosec B104  #
intentional bind to 0.0.0.0 for HTTPS endpoint" where the server bind occurs, "#
nosec B108  # fallback /tmp usage overridden by env in production" at the
temporary-file fallback site, and "# nosec B101  # assert used only for
development validation" on the assert statements). Follow the existing pattern
used for B324/B404/B603: place the nosec on the exact line, include a short
justification comment, and remove the corresponding global skip entries so
Bandit stays scoped to real exceptions.

Comment thread Containerfile
Comment on lines +51 to +57
RUN if [[ "$EXTRAS" == *"mariadb"* ]]; then \
curl --fail -LsSO https://r.mariadb.com/downloads/mariadb_repo_setup && \
# Verify script integrity (basic sanity check - script should contain mariadb_repo_setup signature)
grep -q "mariadb_repo_setup" mariadb_repo_setup || { echo "ERROR: Downloaded script appears invalid"; exit 1; } && \
[ -s mariadb_repo_setup ] || { echo "ERROR: Downloaded script is empty"; exit 1; } && \
chmod +x mariadb_repo_setup && \
./mariadb_repo_setup --mariadb-server-version="mariadb-11.4" --skip-check-installed && \
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Authenticate mariadb_repo_setup before executing it.

This still pulls a mutable script from a latest URL and runs it as root after only a grep/non-empty check. That does not protect against tampering or upstream replacement. Please pin the expected SHA256 and verify it before chmod +x.

🔐 Proposed hardening
+ARG MARIADB_REPO_SETUP_SHA256="<official-sha256>"
 RUN if [[ "$EXTRAS" == *"mariadb"* ]]; then  \
 		curl --fail -LsSO https://r.mariadb.com/downloads/mariadb_repo_setup && \
-		# Verify script integrity (basic sanity check - script should contain mariadb_repo_setup signature)
-		grep -q "mariadb_repo_setup" mariadb_repo_setup || { echo "ERROR: Downloaded script appears invalid"; exit 1; } && \
-		[ -s mariadb_repo_setup ] || { echo "ERROR: Downloaded script is empty"; exit 1; } && \
+		echo "${MARIADB_REPO_SETUP_SHA256}  mariadb_repo_setup" | sha256sum -c - && \
     	chmod +x mariadb_repo_setup && \

Based on learnings: MariaDB publishes official SHA256 checksums for each mariadb_repo_setup release, and the preferred integrity check here is checksum pinning rather than a content grep.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@Containerfile` around lines 51 - 57, The RUN block that downloads and
executes mariadb_repo_setup is insufficiently protected; update it to pin and
verify the script's SHA256 before chmod+x and execution: after curl (in the same
EXTRAS=="mariadb" branch) obtain the expected SHA256 (hard-code the known
fingerprint for this release or download the signed checksum file), compute the
downloaded file's sha256 and compare it against the pinned value, fail the build
if it doesn't match, then proceed to chmod +x and run ./mariadb_repo_setup;
reference the existing symbols EXTRAS and mariadb_repo_setup to locate and
replace the current grep/non-empty checks with this checksum verification step.

Comment on lines +217 to +218
# Arguments are safely parsed using shlex.split() which prevents command injection
p = subprocess.Popen(shlex.split(job.argument), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) # nosec B603
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify whether request input can influence executed command path.
rg -n -C3 'NON_CLI_ARGUMENTS|lm_eval_path|convert_to_cli|job\.argument|subprocess\.Popen|shlex\.split' src/endpoints/evaluation/lm_evaluation_harness.py

Repository: trustyai-explainability/trustyai-service

Length of output: 2317


🏁 Script executed:

#!/bin/bash
# Find the LMEvalJob class definition and where job.argument is set
rg -n 'class LMEvalJob|argument\s*=' src/endpoints/evaluation/lm_evaluation_harness.py | head -20

Repository: trustyai-explainability/trustyai-service

Length of output: 339


🏁 Script executed:

#!/bin/bash
# Find the POST endpoint handler that creates the job
rg -n '@.*post|def.*evaluate|def.*submit' src/endpoints/evaluation/lm_evaluation_harness.py

Repository: trustyai-explainability/trustyai-service

Length of output: 155


🏁 Script executed:

#!/bin/bash
# Verify the full flow: where convert_to_cli is called and job creation
rg -n -A10 'convert_to_cli\(request\)' src/endpoints/evaluation/lm_evaluation_harness.py

Repository: trustyai-explainability/trustyai-service

Length of output: 458


🏁 Script executed:

#!/bin/bash
# Show the endpoint handler function in full context (around line 238)
sed -n '238,260p' src/endpoints/evaluation/lm_evaluation_harness.py

Repository: trustyai-explainability/trustyai-service

Length of output: 1080


🏁 Script executed:

#!/bin/bash
# Check if there's any validation logic before convert_to_cli is called
rg -n -B5 'convert_to_cli' src/endpoints/evaluation/lm_evaluation_harness.py | grep -A5 'def.*job'

Repository: trustyai-explainability/trustyai-service

Length of output: 388


lm_eval_path is user-controllable and bypasses the safety of shlex.split().

The POST endpoint accepts lm_eval_path as a request parameter (line 62, merged into LMEvalRequest at line 93) with no validation. convert_to_cli() uses this directly to construct the command string, which is then executed at line 218. An attacker can set lm_eval_path to any binary (e.g., /bin/sh, /bin/rm), and shlex.split() will not prevent execution—it only prevents shell injection. The # nosec B603 suppression masks a genuine remote code execution vulnerability.

Add validation to ensure only python -m lm_eval execution is allowed:

-    # Arguments are safely parsed using shlex.split() which prevents command injection
-    p = subprocess.Popen(shlex.split(job.argument), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)  # nosec B603
+    argv = shlex.split(job.argument)
+    if argv[:3] != [sys.executable, "-m", "lm_eval"]:
+        raise HTTPException(status_code=400, detail="Only `python -m lm_eval` execution is allowed.")
+    p = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Arguments are safely parsed using shlex.split() which prevents command injection
p = subprocess.Popen(shlex.split(job.argument), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) # nosec B603
argv = shlex.split(job.argument)
if argv[:3] != [sys.executable, "-m", "lm_eval"]:
raise HTTPException(status_code=400, detail="Only `python -m lm_eval` execution is allowed.")
p = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
🧰 Tools
🪛 Ruff (0.15.7)

[error] 218-218: subprocess call: check for execution of untrusted input

(S603)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/endpoints/evaluation/lm_evaluation_harness.py` around lines 217 - 218,
The endpoint currently allows a user-controlled lm_eval_path to be embedded into
the CLI string and run by subprocess in convert_to_cli / where subprocess.Popen
is called, which permits arbitrary binaries; change this by validating and
constructing the command explicitly: reject or ignore any lm_eval_path that is
not an allowed token (e.g., only accept None or a single canonical value like
"python -m lm_eval" or a boolean flag in LMEvalRequest), and always build the
args list as ['python', '-m', 'lm_eval', ...] (or another fixed
interpreter/module tuple) instead of using the raw lm_eval_path string and
shlex.split; update LMEvalRequest handling (where lm_eval_path is merged) to
validate/normalize input and ensure convert_to_cli no longer concatenates a user
string before calling subprocess.Popen.

Comment thread src/service/serialization/models.py Outdated
Comment thread src/service/serialization/rows.py Outdated
@SudipSinha SudipSinha added the enhancement New feature or request label Apr 1, 2026
@SudipSinha SudipSinha force-pushed the security/json-gzip-serialization branch from 8fd5d36 to ff1fcd1 Compare April 8, 2026 15:29
"""
# lgtm[py/weak-sensitive-data-hashing]
md5_hash = hashlib.md5(content.encode("utf-8"), usedforsecurity=False).digest() # nosec B324 - MD5 used for UUID generation, not security
md5_hash = hashlib.md5(content.encode("utf-8"), usedforsecurity=False).digest() # nosec B324 # CodeQL[py/weak-crypto]: MD5 used for UUID generation, not security

Check warning

Code scanning / Bandit

Use of insecure MD2, MD4, MD5, or SHA1 hash function.

Use of insecure MD2, MD4, MD5, or SHA1 hash function.
@SudipSinha SudipSinha added the ok-to-test Proceed with CI testing label Apr 8, 2026
SudipSinha and others added 5 commits May 7, 2026 15:55
…ion code

- Fix B324 (HIGH): Add usedforsecurity=False to MD5 hash in prometheus_publisher.py
  MD5 is used only for UUID generation (matching Java's UUID.nameUUIDFromBytes()),
  not for cryptographic purposes

- Document B301 pickle usage with nosec comments in maria.py, pvc.py, and list_utils.py
  Pickle deserialization is safe here as it only processes internally generated data
  from trusted sources (database and HDF5 storage)

- Add .bandit configuration to skip false positives:
  B608 (SQL injection - table names are constants)
  B104 (0.0.0.0 binding - intentional for HTTPS endpoint)
  B108 (/tmp usage - overridden by env var in production)
  B403/B404/B603 (subprocess/pickle imports - usage reviewed)
  B101 (assert statements - development validation only)

Resolves: RHOAIENG-55581

Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
…dit config

Address code review feedback from @RobGeada and Sourcery AI:

1. **Pickle Deserialization Security (maria.py, pvc.py, list_utils.py)**:
   - Add explicit SECURITY NOTE in docstrings warning that pickle
     deserialization functions expect trusted internal data only
   - Clarify that these functions should not be used with user-supplied
     or external data to prevent potential deserialization attacks

2. **MD5 Hash Suppression (prometheus_publisher.py)**:
   - Add LGTM suppression comment for CodeQL scanner
   - Add Bandit nosec B324 comment for consistency
   - Keep usedforsecurity=False flag and documentation

3. **Narrow Bandit Configuration**:
   - Move B403 (pickle import) from global skip to inline nosec comments
   - Move B404 (subprocess import) from global skip to inline nosec comment
   - Move B603 (subprocess.Popen) from global skip to inline nosec comment
   - Add security justification comments at each import/usage site
   - Retain global skips only for B608, B104, B108, B101

Benefits:
- Future code changes will be properly scanned for new pickle/subprocess usage
- Security concerns are documented at the point of use
- Reduces risk of masking new security issues in other parts of codebase

Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
…E-502

Eliminates arbitrary code execution vulnerability (CWE-502) by replacing
pickle with secure JSON+gzip serialization for model and row data storage.

Changes:
- Add serialization/ module with JSON+gzip encoding for Pydantic models and rows
- Implement automatic pickle format detection for backward compatibility
- Update maria.py and pvc.py to use new serialization APIs
- Add comprehensive test coverage (79 tests passing)
- Fix empty list handling in list_utils.py
- Update error messages to reflect new serialization format

Security Impact:
- Eliminates CWE-502 (Deserialization of Untrusted Data)
- Removes arbitrary code execution attack vector
- Maintains backward compatibility via format detection

Resolves: RHOAIENG-56132

Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
…alization

Add custom DeserializationError exception for better observability of
deserialization failures. Implement comprehensive test coverage for edge
cases and error scenarios (15 new tests). Remove all references to pickle
from documentation.

Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
- Add safe_gzip_decompress() with 128 MiB size limit to prevent
  decompression bombs (used in both model and row deserialization)
- Replace silent str(obj) fallback in json_encoder with TypeError
  to prevent silent data corruption on unexpected types
- Add compresslevel parameter to serialize_rows (default=1 for
  throughput on the hot path)
- Remove stale pickle references from docstrings, comments, and tests
- Remove unnecessary backward-compatibility comments
- Add PLR2004 per-file-ignore for test_rows.py to allow test literals

Co-Authored-By: Claude Opus 4.6 <[email protected]>
@SudipSinha SudipSinha force-pushed the security/json-gzip-serialization branch from ff1fcd1 to f1dcafd Compare May 7, 2026 19:33
@SudipSinha SudipSinha added the python Pull requests that update python code label May 7, 2026
- Restore data_dir.exists() guard in PVCStorage.__init__ to prevent
  crash when data directory doesn't exist yet
- Add Path.exists() check in delete_dataset before opening HDF5 in "a"
  mode to prevent phantom file creation
- Re-raise Pydantic ValidationError in deserialize_model instead of
  swallowing it via the generic ValueError handler
- Restore migration task done_callback in MariaDB to suppress
  "Task exception was never retrieved" warnings
- Add json_decoder_hook to deserialize_rows for bytes roundtrip fidelity

Co-Authored-By: Claude Opus 4.6 <[email protected]>
@SudipSinha SudipSinha marked this pull request as ready for review May 7, 2026 19:55
Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 16 security issues, 3 other issues, and left some high level feedback:

Security issues:

  • Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option. (link)
  • Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option. (link)
  • Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option. (link)
  • Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option. (link)
  • Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option. (link)
  • Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option. (link)
  • Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option. (link)
  • Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option. (link)
  • Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option. (link)
  • Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option. (link)
  • Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option. (link)
  • Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option. (link)
  • Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option. (link)
  • Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option. (link)
  • Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option. (link)
  • Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option. (link)

General comments:

  • The MariaDBStorage init signature now requires non-optional user/password/host/database parameters (previously defaulted), which can break existing instantiations; consider keeping backward-compatible defaults or adding an alternate constructor/factory to avoid runtime regressions.
  • Both PVCStorage.get_partial_payload and MariaDBStorage.get_partial_payload now raise DeserializationError on corruption instead of returning None; review upstream call sites to ensure they don’t blanket-catch Exception and that they distinguish between "not found" (None) and deserialization failures to avoid masking data-integrity issues.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The MariaDBStorage __init__ signature now requires non-optional user/password/host/database parameters (previously defaulted), which can break existing instantiations; consider keeping backward-compatible defaults or adding an alternate constructor/factory to avoid runtime regressions.
- Both PVCStorage.get_partial_payload and MariaDBStorage.get_partial_payload now raise DeserializationError on corruption instead of returning None; review upstream call sites to ensure they don’t blanket-catch Exception and that they distinguish between "not found" (None) and deserialization failures to avoid masking data-integrity issues.

## Individual Comments

### Comment 1
<location path="src/service/serialization/models.py" line_range="56" />
<code_context>
+    return gzip.compress(json_str.encode("utf-8"))
+
+
+def deserialize_model[T: BaseModel](data: bytes, target_class: type[T]) -> T:
+    """Deserialize and validate data against expected Pydantic schema.
+
</code_context>
<issue_to_address>
**issue (bug_risk):** The generic function syntax `def deserialize_model[T: BaseModel]` is not valid on current Python versions and will raise a SyntaxError.

This syntax relies on PEP 695, which is only available from Python 3.13. For compatibility with 3.10–3.12, please use a TypeVar-based generic instead, for example:

```python
from typing import Type, TypeVar

T = TypeVar("T", bound=BaseModel)

def deserialize_model(data: bytes, target_class: Type[T]) -> T:
    ...
```

This keeps the function type-safe while remaining compatible with current runtimes.
</issue_to_address>

### Comment 2
<location path="src/service/data/storage/pvc.py" line_range="740" />
<code_context>
+                "ModelMesh" if is_modelmesh else "KServe",
+                "input" if is_input else "output",
+            )
+            raise DeserializationError(
+                payload_id=payload_id,
+                reason=f"Failed to deserialize {'ModelMesh' if is_modelmesh else 'KServe'} "
</code_context>
<issue_to_address>
**issue (bug_risk):** DeserializationError is used here but not imported, which will cause a NameError at runtime.

Please import `DeserializationError` from its defining module (likely `src.service.data.storage.exceptions`) so this error path raises the intended storage exception instead of a `NameError`.
</issue_to_address>

### Comment 3
<location path="src/service/data/storage/maria/maria.py" line_range="606" />
<code_context>
+                "ModelMesh" if is_modelmesh else "KServe",
+                "input" if is_input else "output",
+            )
+            raise DeserializationError(
+                payload_id=payload_id,
+                reason=f"Failed to deserialize {'ModelMesh' if is_modelmesh else 'KServe'} "
</code_context>
<issue_to_address>
**issue (bug_risk):** DeserializationError is raised here but the class is not imported in this module.

This will raise a `NameError` on the first deserialization failure. Please import `DeserializationError` in this module (e.g. `from src.service.data.storage.exceptions import DeserializationError`) so the intended exception is raised.
</issue_to_address>

### Comment 4
<location path="src/service/data/storage/maria/maria.py" line_range="165-168" />
<code_context>
            cursor.execute(
                f"SELECT table_idx FROM `{self.dataset_reference_table}` WHERE dataset_name=?",  # noqa: S608 -- table name from internal constant
                (dataset_name,),
            )
</code_context>
<issue_to_address>
**security (python.sqlalchemy.security.sqlalchemy-execute-raw-query):** Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

*Source: opengrep*
</issue_to_address>

### Comment 5
<location path="src/service/data/storage/maria/maria.py" line_range="175-178" />
<code_context>
            cursor.execute(
                f"SELECT metadata FROM `{self.dataset_reference_table}` WHERE dataset_name=?",  # noqa: S608 -- table name from internal constant
                (dataset_name,),
            )
</code_context>
<issue_to_address>
**security (python.sqlalchemy.security.sqlalchemy-execute-raw-query):** Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

*Source: opengrep*
</issue_to_address>

### Comment 6
<location path="src/service/data/storage/maria/maria.py" line_range="188-191" />
<code_context>
                cursor.execute(
                    f"SELECT dataset_name FROM `{self.dataset_reference_table}` WHERE dataset_name=?",  # noqa: S608 -- table name from internal constant
                    (dataset_name,),
                )
</code_context>
<issue_to_address>
**security (python.sqlalchemy.security.sqlalchemy-execute-raw-query):** Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

*Source: opengrep*
</issue_to_address>

### Comment 7
<location path="src/service/data/storage/maria/maria.py" line_range="209-212" />
<code_context>
            cursor.execute(
                f"SELECT n_rows FROM `{self.dataset_reference_table}` WHERE dataset_name=?",  # noqa: S608 -- table name from internal constant
                (dataset_name,),
            )
</code_context>
<issue_to_address>
**security (python.sqlalchemy.security.sqlalchemy-execute-raw-query):** Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

*Source: opengrep*
</issue_to_address>

### Comment 8
<location path="src/service/data/storage/maria/maria.py" line_range="220" />
<code_context>
            cursor.execute(f"SHOW COLUMNS FROM {table_name}")
</code_context>
<issue_to_address>
**security (python.sqlalchemy.security.sqlalchemy-execute-raw-query):** Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

*Source: opengrep*
</issue_to_address>

### Comment 9
<location path="src/service/data/storage/maria/maria.py" line_range="270-273" />
<code_context>
                cursor.execute(
                    f"INSERT INTO `{self.dataset_reference_table}` (dataset_name, metadata, n_rows) VALUES (?, ?, 0)",  # noqa: S608 -- table name from internal constant
                    (dataset_name, json.dumps(metadata)),
                )
</code_context>
<issue_to_address>
**security (python.sqlalchemy.security.sqlalchemy-execute-raw-query):** Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

*Source: opengrep*
</issue_to_address>

### Comment 10
<location path="src/service/data/storage/maria/maria.py" line_range="276-279" />
<code_context>
                cursor.execute(
                    f"SELECT table_idx FROM `{self.dataset_reference_table}` WHERE dataset_name=?",  # noqa: S608 -- table name from internal constant
                    (dataset_name,),
                )
</code_context>
<issue_to_address>
**security (python.sqlalchemy.security.sqlalchemy-execute-raw-query):** Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

*Source: opengrep*
</issue_to_address>

### Comment 11
<location path="src/service/data/storage/maria/maria.py" line_range="345-349" />
<code_context>
            cursor.execute(
                f"UPDATE `{self.dataset_reference_table}` SET n_rows=? WHERE dataset_name=?",  # noqa: S608 -- table name from internal constant
                (
                    nrows + len(new_rows),
                    dataset_name,
                ),
            )
</code_context>
<issue_to_address>
**security (python.sqlalchemy.security.sqlalchemy-execute-raw-query):** Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

*Source: opengrep*
</issue_to_address>

### Comment 12
<location path="src/service/data/storage/maria/maria.py" line_range="380-383" />
<code_context>
            cursor.execute(
                f"SELECT * FROM `{table_name}` ORDER BY row_idx ASC LIMIT ? OFFSET ?",  # noqa: S608 -- table name from _get_clean_table_name()
                (n_rows, start_row),
            )
</code_context>
<issue_to_address>
**security (python.sqlalchemy.security.sqlalchemy-execute-raw-query):** Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

*Source: opengrep*
</issue_to_address>

### Comment 13
<location path="src/service/data/storage/maria/maria.py" line_range="564-567" />
<code_context>
            cursor.execute(
                f"INSERT INTO `{self.partial_payload_table}` (payload_id, is_input, payload_data) VALUES (?, ?, ?)",  # noqa: S608 -- table name from internal constant
                (payload_id, is_input, serialize_model(payload)),
            )
</code_context>
<issue_to_address>
**security (python.sqlalchemy.security.sqlalchemy-execute-raw-query):** Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

*Source: opengrep*
</issue_to_address>

### Comment 14
<location path="src/service/data/storage/maria/maria.py" line_range="578-581" />
<code_context>
            cursor.execute(
                f"SELECT payload_data FROM `{self.partial_payload_table}` WHERE payload_id=? AND is_input=?",  # noqa: S608 -- table name from internal constant
                (payload_id, is_input),
            )
</code_context>
<issue_to_address>
**security (python.sqlalchemy.security.sqlalchemy-execute-raw-query):** Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

*Source: opengrep*
</issue_to_address>

### Comment 15
<location path="src/service/data/storage/maria/maria.py" line_range="616-619" />
<code_context>
            cursor.execute(
                f"DELETE FROM {self.partial_payload_table} WHERE payload_id=? AND is_input=?",  # noqa: S608 -- table name from internal constant
                (payload_id, is_input),
            )
</code_context>
<issue_to_address>
**security (python.sqlalchemy.security.sqlalchemy-execute-raw-query):** Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

*Source: opengrep*
</issue_to_address>

### Comment 16
<location path="src/service/data/storage/maria/maria.py" line_range="649-652" />
<code_context>
            cursor.execute(
                f"DELETE FROM `{self.dataset_reference_table}` WHERE dataset_name=?",  # noqa: S608 -- table name from internal constant
                (dataset_name,),
            )
</code_context>
<issue_to_address>
**security (python.sqlalchemy.security.sqlalchemy-execute-raw-query):** Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

*Source: opengrep*
</issue_to_address>

### Comment 17
<location path="src/service/data/storage/maria/maria.py" line_range="653" />
<code_context>
            cursor.execute(f"DROP TABLE IF EXISTS `{table_name}`")
</code_context>
<issue_to_address>
**security (python.sqlalchemy.security.sqlalchemy-execute-raw-query):** Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

*Source: opengrep*
</issue_to_address>

### Comment 18
<location path="src/service/data/storage/maria/maria.py" line_range="667" />
<code_context>
            cursor.execute(f"DROP TABLE IF EXISTS `{self.dataset_reference_table}`")
</code_context>
<issue_to_address>
**security (python.sqlalchemy.security.sqlalchemy-execute-raw-query):** Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

*Source: opengrep*
</issue_to_address>

### Comment 19
<location path="src/service/data/storage/maria/maria.py" line_range="668" />
<code_context>
            cursor.execute(f"DROP TABLE IF EXISTS `{self.partial_payload_table}`")
</code_context>
<issue_to_address>
**security (python.sqlalchemy.security.sqlalchemy-execute-raw-query):** Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

*Source: opengrep*
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread src/service/serialization/models.py
Comment thread src/service/data/storage/pvc.py
Comment thread src/service/data/storage/maria/maria.py
Comment thread src/service/data/storage/maria/maria.py
Comment on lines 175 to 178
cursor.execute(
f"SELECT metadata FROM `{self.dataset_reference_table}` WHERE dataset_name=?", # noqa: S608 # table name is internal, not user input
f"SELECT metadata FROM `{self.dataset_reference_table}` WHERE dataset_name=?", # noqa: S608 -- table name from internal constant
(dataset_name,),
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.sqlalchemy.security.sqlalchemy-execute-raw-query): Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

Source: opengrep

Comment on lines 616 to 619
cursor.execute(
f"DELETE FROM {self.partial_payload_table} WHERE payload_id=? AND is_input=?", # noqa: S608 # table name is internal, not user input
f"DELETE FROM {self.partial_payload_table} WHERE payload_id=? AND is_input=?", # noqa: S608 -- table name from internal constant
(payload_id, is_input),
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.sqlalchemy.security.sqlalchemy-execute-raw-query): Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

Source: opengrep

Comment on lines 649 to 652
cursor.execute(
f"DELETE FROM `{self.dataset_reference_table}` WHERE dataset_name=?", # noqa: S608 # table name is internal, not user input
f"DELETE FROM `{self.dataset_reference_table}` WHERE dataset_name=?", # noqa: S608 -- table name from internal constant
(dataset_name,),
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.sqlalchemy.security.sqlalchemy-execute-raw-query): Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

Source: opengrep

(dataset_name,),
)
cursor.execute(f"DROP TABLE IF EXISTS `{table_name}`") # nosec S608
cursor.execute(f"DROP TABLE IF EXISTS `{table_name}`")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.sqlalchemy.security.sqlalchemy-execute-raw-query): Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

Source: opengrep

with self.connection_manager as (conn, cursor):
cursor.execute(f"DROP TABLE IF EXISTS `{self.dataset_reference_table}`") # nosec S608
cursor.execute(f"DROP TABLE IF EXISTS `{self.partial_payload_table}`") # nosec S608
cursor.execute(f"DROP TABLE IF EXISTS `{self.dataset_reference_table}`")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.sqlalchemy.security.sqlalchemy-execute-raw-query): Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

Source: opengrep

cursor.execute(f"DROP TABLE IF EXISTS `{self.dataset_reference_table}`") # nosec S608
cursor.execute(f"DROP TABLE IF EXISTS `{self.partial_payload_table}`") # nosec S608
cursor.execute(f"DROP TABLE IF EXISTS `{self.dataset_reference_table}`")
cursor.execute(f"DROP TABLE IF EXISTS `{self.partial_payload_table}`")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.sqlalchemy.security.sqlalchemy-execute-raw-query): Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

Source: opengrep

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (9)
tests/service/data/test_payload_reconciliation_pvc.py (1)

384-387: 💤 Low value

Event loop is not closed; consider asyncio.run for the test driver.

run_async_test creates a new event loop but never calls loop.close(), leaking the loop and any associated resources between tests. Using asyncio.run is simpler and properly tears down the loop.

♻️ Suggested change
 def run_async_test(coro: Coroutine) -> None:
     """Run an async test coroutine in a new event loop."""
-    loop = asyncio.new_event_loop()
-    return loop.run_until_complete(coro)
+    return asyncio.run(coro)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/service/data/test_payload_reconciliation_pvc.py` around lines 384 -
387, The helper run_async_test creates a new event loop but never closes it,
leaking resources; replace its implementation to use asyncio.run(coro) (which
creates and closes the loop for you) or ensure you call loop.close() after
loop.run_until_complete(coro). Locate the run_async_test function and either
change its body to "return asyncio.run(coro)" or wrap run_until_complete in a
try/finally that calls loop.close() to properly tear down the event loop.
src/service/data/storage/pvc.py (3)

713-728: 💤 Low value

Redundant isinstance(..., np.ndarray) clause.

serialized_data either is bytes or it isn't; the np.ndarray disjunct is fully subsumed by not isinstance(serialized_data, bytes). Simplify and the intent becomes clearer.

♻️ Suggested change
-                    try:
-                        # Convert to bytes if needed (HDF5 attributes may return numpy arrays)
-                        if isinstance(serialized_data, np.ndarray) or not isinstance(
-                            serialized_data, bytes
-                        ):
-                            serialized_data = bytes(serialized_data)
+                    try:
+                        # HDF5 attributes can return numpy void/array; coerce to bytes.
+                        if not isinstance(serialized_data, bytes):
+                            serialized_data = bytes(serialized_data)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/service/data/storage/pvc.py` around lines 713 - 728, The
isinstance(serialized_data, np.ndarray) check is redundant because the following
not isinstance(serialized_data, bytes) already covers non-bytes types; update
the conversion logic in the try block that handles serialized_data so it only
checks "not isinstance(serialized_data, bytes)" before calling
bytes(serialized_data). Keep the rest of the flow intact (determining
target_class as PartialPayload, KServeInferenceRequest, or
KServeInferenceResponse and calling deserialize_model(serialized_data,
target_class)).

290-316: ⚡ Quick win

Simplify the dispatch in write_data — the ndarray/list branches are redundant.

The elif at Lines 298–304 reduces to "non-numeric data, serialize". The two clauses inside the elif are exhaustive partitions of "any object with non-numeric content," so the existing structure can be flattened. This also makes the numeric-only ndarray fast path more obvious.

♻️ Suggested change
-        if isinstance(new_rows, np.ndarray) and not list_utils.contains_non_numeric(
-            new_rows
-        ):
-            await self._write_raw_data(dataset_name, new_rows, column_names)
-        elif (
-            isinstance(new_rows, np.ndarray)
-            and list_utils.contains_non_numeric(new_rows)
-        ) or (
-            not isinstance(new_rows, np.ndarray)
-            and list_utils.contains_non_numeric(new_rows)
-        ):
-            serialized = serialize_rows(new_rows, MAX_VOID_TYPE_LENGTH)
-            arr = np.array(serialized)
-            if arr.ndim == 1:
-                arr = arr.reshape(-1, 1)
-            await self._write_raw_data(
-                dataset_name,
-                arr,
-                column_names,
-                is_bytes=True,
-            )
-        else:
-            await self._write_raw_data(dataset_name, np.array(new_rows), column_names)
+        if list_utils.contains_non_numeric(new_rows):
+            serialized = serialize_rows(new_rows, MAX_VOID_TYPE_LENGTH)
+            arr = np.array(serialized)
+            if arr.ndim == 1:
+                arr = arr.reshape(-1, 1)
+            await self._write_raw_data(
+                dataset_name, arr, column_names, is_bytes=True
+            )
+        elif isinstance(new_rows, np.ndarray):
+            await self._write_raw_data(dataset_name, new_rows, column_names)
+        else:
+            await self._write_raw_data(dataset_name, np.array(new_rows), column_names)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/service/data/storage/pvc.py` around lines 290 - 316, The write_data
dispatch is overly verbose: collapse the redundant ndarray/list checks into a
simple three-way branch in write_data — first, if new_rows is an ndarray and not
list_utils.contains_non_numeric(new_rows) call await
self._write_raw_data(dataset_name, new_rows, column_names); second, elif
list_utils.contains_non_numeric(new_rows) then call serialized =
serialize_rows(new_rows, MAX_VOID_TYPE_LENGTH); arr = np.array(serialized); if
arr.ndim == 1: arr = arr.reshape(-1, 1); then await
self._write_raw_data(dataset_name, arr, column_names, is_bytes=True); else
(numeric non-ndarray case) call await self._write_raw_data(dataset_name,
np.array(new_rows), column_names). This simplifies logic while keeping use of
serialize_rows, MAX_VOID_TYPE_LENGTH, list_utils.contains_non_numeric, np.array,
and _write_raw_data intact.

318-346: ⚖️ Poor tradeoff

_read_raw_data swallows out-of-range start_row silently.

When start_row > dataset.shape[0], the function only logs a warning and then returns dataset[start_row:end_row], which yields an empty slice. Then in read_data (Line 344), len(read) is 0, so the read[0].dtype branch is short-circuited and an empty array is returned regardless of dataset content. That's reasonable, but consider raising IndexError/ValueError for clearly invalid inputs instead of returning empty data, which can mask caller bugs (e.g., an off-by-one in pagination).

♻️ Suggested change
-                if start_row > dataset.shape[0]:
-                    logger.warning(
-                        "Requested a data read from start_row=%d, but dataset "
-                        "only has %d rows. An empty array will be returned.",
-                        start_row,
-                        dataset.shape[0],
-                    )
-                return dataset[start_row:end_row]
+                if start_row > dataset.shape[0]:
+                    msg = (
+                        f"start_row={start_row} exceeds dataset rows="
+                        f"{dataset.shape[0]} for {allocated_dataset_name}"
+                    )
+                    raise IndexError(msg)
+                return dataset[start_row:end_row]
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/service/data/storage/pvc.py` around lines 318 - 346, _read_raw_data
currently only logs a warning when start_row > dataset.shape[0] and returns an
empty slice, which hides caller bugs; change this to raise a clear exception
(IndexError or ValueError) instead of returning an empty array: inside
_read_raw_data (working with H5PYContext and the dataset variable) check if
start_row > dataset.shape[0] and raise an IndexError with a descriptive message
(include allocated_dataset_name and the invalid start_row), so callers of
read_data/deserializers like deserialize_rows get explicit failure rather than
silent empty results; keep the existing behavior for start_row ==
dataset.shape[0] if you want an empty result, but ensure the raised exception
covers strictly out-of-range starts.
src/service/data/storage/exceptions.py (1)

36-37: 💤 Low value

Use explicit is not None check.

if original_exception: invokes truthiness on the exception. While standard Exception instances are truthy, custom exception subclasses can override __bool__/__len__ and produce a falsy instance, causing the cause to be silently dropped from the message. Prefer an identity check.

♻️ Suggested change
-        if original_exception:
+        if original_exception is not None:
             message += f" (caused by: {type(original_exception).__name__}: {original_exception})"
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/service/data/storage/exceptions.py` around lines 36 - 37, The conditional
currently uses truthiness on original_exception which can fail for exceptions
overriding __bool__/__len__; update the check in the exceptions message
construction to use an explicit identity check (replace "if original_exception:"
with "if original_exception is not None:") so the cause is always included when
provided; keep the existing message formatting that appends f" (caused by:
{type(original_exception).__name__}: {original_exception})" unchanged.
src/service/serialization/detection.py (1)

98-106: 💤 Low value

LGTM on the +1 sentinel pattern.

Reading max_size + 1 and then comparing len(result) > max_size is a clean way to detect a decompression bomb without materializing the full payload. Note that safe_gzip_decompress can also raise OSError/gzip.BadGzipFile/EOFError/zlib.error for malformed gzip streams (only ValueError is documented); callers in rows.py already handle these, so consider expanding the Raises section in the docstring for clarity.

♻️ Optional docstring tweak
     Raises:
         ValueError: If decompressed data exceeds max_size
+        OSError, gzip.BadGzipFile, EOFError, zlib.error: If gzip data is malformed
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/service/serialization/detection.py` around lines 98 - 106, The docstring
for safe_gzip_decompress should be updated to accurately document the exceptions
it can raise for malformed gzip streams; expand the Raises section to include
OSError (and specifically gzip.BadGzipFile), EOFError, and zlib.error in
addition to ValueError so callers (like code in rows.py) know what to expect;
locate the safe_gzip_decompress function in detection.py and add those exception
types to the docstring with brief descriptions.
src/service/serialization/rows.py (1)

17-17: 💤 Low value

Unused module logger.

logger is defined but never referenced in this module — neither serialization nor deserialization paths emit logs (errors are converted to ValueError). Either drop the logger or add diagnostic logging at error sites for observability.

♻️ Suggested change
-import logging
 import zlib
 ...
-from .encoders import json_decoder_hook, json_encoder
-
-logger = logging.getLogger(__name__)
+from .encoders import json_decoder_hook, json_encoder
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/service/serialization/rows.py` at line 17, The module defines logger =
logging.getLogger(__name__) but never uses it; either remove the unused logger
or add diagnostic logging where errors occur — e.g., in serialize_rows and
deserialize_rows catch blocks log exception details with logger.exception or
logger.error before converting/raising ValueError; ensure the log message
includes context (function name and payload info) and keep the existing
ValueError behavior.
tests/service/serialization/test_rows.py (1)

150-167: ⚡ Quick win

Add a roundtrip test for bytes payloads inside rows.

deserialize_rows was specifically updated to pass object_hook=json_decoder_hook so that bytes encoded as {"__type__": "bytes", "data": ...} are reconstructed. Right now no test covers a row containing bytes end-to-end through serialize_rows/deserialize_rows — only encoder/decoder unit tests exercise the marker shape. A small explicit test would lock in this contract and prevent regression.

♻️ Suggested addition
+    def test_roundtrip_preserves_bytes(self) -> None:
+        """Bytes columns survive the JSON+gzip roundtrip via json_decoder_hook."""
+        original_rows = [[b"\x00\x01\xff", "label-a"], [b"\xde\xad\xbe\xef", "label-b"]]
+
+        serialized = serialize_rows(
+            original_rows, max_void_type_length=MAX_VOID_TYPE_LENGTH
+        )
+        deserialized = deserialize_rows(serialized)
+
+        for i, row in enumerate(deserialized):
+            assert row[0] == original_rows[i][0]
+            assert isinstance(row[0], bytes)
+            assert row[1] == original_rows[i][1]
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/service/serialization/test_rows.py` around lines 150 - 167, Add a
roundtrip assertion that includes a row containing a bytes object so
serialize_rows/deserialize_rows preserve bytes via the json_decoder_hook; update
or add a test (e.g., extend test_roundtrip_preserves_data_types) to include a
row like [b"abc", b"\x00\xff"] in original_rows, call serialize_rows(...) and
deserialize_rows(...), and assert the deserialized rows equal the original bytes
values using list(row) comparisons; reference serialize_rows and
deserialize_rows to locate where to add the case.
src/service/serialization/encoders.py (1)

36-44: 💤 Low value

Add np.bool_ handling and test coverage.

np.bool_ is not covered by the current isinstance(obj, (np.integer, np.floating)) check, causing NumPy boolean scalars to raise TypeError. While no np.bool_ usage was found in the codebase, this edge case can occur with boolean DataFrame operations or boolean masks and should be handled for robustness.

♻️ Suggested changes

In src/service/serialization/encoders.py, add np.bool_ to the isinstance check:

-    if isinstance(obj, (np.integer, np.floating)):
+    if isinstance(obj, (np.integer, np.floating, np.bool_)):
         return obj.item()

In tests/service/serialization/test_encoders.py, add a test case:

     def test_encode_numpy_float(self) -> None:
         """Test encoding numpy float scalars."""
         value = np.float64(EXPECTED_FLOAT)
         result = json_encoder(value)

         assert result == EXPECTED_FLOAT
         assert isinstance(result, float)
+
+    def test_encode_numpy_bool(self) -> None:
+        """Test encoding numpy bool scalars."""
+        value = np.bool_(True)
+        result = json_encoder(value)
+
+        assert result is True
+        assert isinstance(result, bool)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/service/serialization/encoders.py` around lines 36 - 44, The encoder
currently misses NumPy boolean scalars (np.bool_) causing them to raise
TypeError; update the isinstance check in the encoder (the block handling
np.integer/np.floating) to include np.bool_ (e.g., isinstance(obj, (np.integer,
np.floating, np.bool_))) and return obj.item() for np.bool_ as well, and add a
unit test in tests/service/serialization/test_encoders.py that verifies
serializing np.bool_(True) yields a Python bool (or JSON true) to cover this
edge case.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/service/data/storage/maria/maria.py`:
- Around line 132-134: The done-callback on self._migration_task currently calls
t.exception() and discards it, hiding async migration errors; change the
callback passed to self._migration_task.add_done_callback to retrieve the
exception (e = t.exception()) and if e is not None log it via the instance
logger (e.g., self._logger.error("migration failed", exc_info=e)) and optionally
trigger fail-fast behavior (raise, set an error flag, or call
self._shutdown/fail handler) so migration failures are visible and actionable;
update the lambda to a named function or small closure to perform logging and
any desired fast-fail handling.
- Around line 250-252: The code currently uses numpy's pickle-capable save/load
for per-cell storage (np.save(..., allow_pickle=True) and np.load(...,
allow_pickle=True)); replace those with a gzip-compressed JSON serialization
using the project's json_encoder (from src/service/serialization/encoders.py)
following the pattern in serialize_rows()/deserialize_rows(): when writing a
cell, JSON-encode the value with json_encoder, compress with gzip into bytes and
store that byte blob instead of np.save output; when reading a cell, decompress
gzip, parse JSON back into Python using the complementary decoder logic to
restore numpy arrays/scalars/binary (base64) to their original types; update the
functions in maria.py that currently call np.save and np.load to use these
JSON+gzip encode/decode steps and remove any allow_pickle usage so
storage/deserialization no longer relies on pickle.

---

Nitpick comments:
In `@src/service/data/storage/exceptions.py`:
- Around line 36-37: The conditional currently uses truthiness on
original_exception which can fail for exceptions overriding __bool__/__len__;
update the check in the exceptions message construction to use an explicit
identity check (replace "if original_exception:" with "if original_exception is
not None:") so the cause is always included when provided; keep the existing
message formatting that appends f" (caused by:
{type(original_exception).__name__}: {original_exception})" unchanged.

In `@src/service/data/storage/pvc.py`:
- Around line 713-728: The isinstance(serialized_data, np.ndarray) check is
redundant because the following not isinstance(serialized_data, bytes) already
covers non-bytes types; update the conversion logic in the try block that
handles serialized_data so it only checks "not isinstance(serialized_data,
bytes)" before calling bytes(serialized_data). Keep the rest of the flow intact
(determining target_class as PartialPayload, KServeInferenceRequest, or
KServeInferenceResponse and calling deserialize_model(serialized_data,
target_class)).
- Around line 290-316: The write_data dispatch is overly verbose: collapse the
redundant ndarray/list checks into a simple three-way branch in write_data —
first, if new_rows is an ndarray and not
list_utils.contains_non_numeric(new_rows) call await
self._write_raw_data(dataset_name, new_rows, column_names); second, elif
list_utils.contains_non_numeric(new_rows) then call serialized =
serialize_rows(new_rows, MAX_VOID_TYPE_LENGTH); arr = np.array(serialized); if
arr.ndim == 1: arr = arr.reshape(-1, 1); then await
self._write_raw_data(dataset_name, arr, column_names, is_bytes=True); else
(numeric non-ndarray case) call await self._write_raw_data(dataset_name,
np.array(new_rows), column_names). This simplifies logic while keeping use of
serialize_rows, MAX_VOID_TYPE_LENGTH, list_utils.contains_non_numeric, np.array,
and _write_raw_data intact.
- Around line 318-346: _read_raw_data currently only logs a warning when
start_row > dataset.shape[0] and returns an empty slice, which hides caller
bugs; change this to raise a clear exception (IndexError or ValueError) instead
of returning an empty array: inside _read_raw_data (working with H5PYContext and
the dataset variable) check if start_row > dataset.shape[0] and raise an
IndexError with a descriptive message (include allocated_dataset_name and the
invalid start_row), so callers of read_data/deserializers like deserialize_rows
get explicit failure rather than silent empty results; keep the existing
behavior for start_row == dataset.shape[0] if you want an empty result, but
ensure the raised exception covers strictly out-of-range starts.

In `@src/service/serialization/detection.py`:
- Around line 98-106: The docstring for safe_gzip_decompress should be updated
to accurately document the exceptions it can raise for malformed gzip streams;
expand the Raises section to include OSError (and specifically
gzip.BadGzipFile), EOFError, and zlib.error in addition to ValueError so callers
(like code in rows.py) know what to expect; locate the safe_gzip_decompress
function in detection.py and add those exception types to the docstring with
brief descriptions.

In `@src/service/serialization/encoders.py`:
- Around line 36-44: The encoder currently misses NumPy boolean scalars
(np.bool_) causing them to raise TypeError; update the isinstance check in the
encoder (the block handling np.integer/np.floating) to include np.bool_ (e.g.,
isinstance(obj, (np.integer, np.floating, np.bool_))) and return obj.item() for
np.bool_ as well, and add a unit test in
tests/service/serialization/test_encoders.py that verifies serializing
np.bool_(True) yields a Python bool (or JSON true) to cover this edge case.

In `@src/service/serialization/rows.py`:
- Line 17: The module defines logger = logging.getLogger(__name__) but never
uses it; either remove the unused logger or add diagnostic logging where errors
occur — e.g., in serialize_rows and deserialize_rows catch blocks log exception
details with logger.exception or logger.error before converting/raising
ValueError; ensure the log message includes context (function name and payload
info) and keep the existing ValueError behavior.

In `@tests/service/data/test_payload_reconciliation_pvc.py`:
- Around line 384-387: The helper run_async_test creates a new event loop but
never closes it, leaking resources; replace its implementation to use
asyncio.run(coro) (which creates and closes the loop for you) or ensure you call
loop.close() after loop.run_until_complete(coro). Locate the run_async_test
function and either change its body to "return asyncio.run(coro)" or wrap
run_until_complete in a try/finally that calls loop.close() to properly tear
down the event loop.

In `@tests/service/serialization/test_rows.py`:
- Around line 150-167: Add a roundtrip assertion that includes a row containing
a bytes object so serialize_rows/deserialize_rows preserve bytes via the
json_decoder_hook; update or add a test (e.g., extend
test_roundtrip_preserves_data_types) to include a row like [b"abc", b"\x00\xff"]
in original_rows, call serialize_rows(...) and deserialize_rows(...), and assert
the deserialized rows equal the original bytes values using list(row)
comparisons; reference serialize_rows and deserialize_rows to locate where to
add the case.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 8afcc8c3-90c6-49e0-aa34-0432d49e49aa

📥 Commits

Reviewing files that changed from the base of the PR and between 536e45f and 3c57111.

📒 Files selected for processing (20)
  • pyproject.toml
  • src/service/data/storage/exceptions.py
  • src/service/data/storage/maria/legacy_maria_reader.py
  • src/service/data/storage/maria/maria.py
  • src/service/data/storage/pvc.py
  • src/service/serialization/__init__.py
  • src/service/serialization/detection.py
  • src/service/serialization/encoders.py
  • src/service/serialization/models.py
  • src/service/serialization/rows.py
  • src/service/utils/list_utils.py
  • tests/service/data/test_payload_reconciliation_maria.py
  • tests/service/data/test_payload_reconciliation_pvc.py
  • tests/service/serialization/__init__.py
  • tests/service/serialization/test_detection.py
  • tests/service/serialization/test_encoders.py
  • tests/service/serialization/test_models.py
  • tests/service/serialization/test_rows.py
  • tests/service/utils/__init__.py
  • tests/service/utils/test_list_utils.py
✅ Files skipped from review due to trivial changes (6)
  • tests/service/utils/init.py
  • tests/service/serialization/init.py
  • src/service/data/storage/maria/legacy_maria_reader.py
  • tests/service/data/test_payload_reconciliation_maria.py
  • src/service/serialization/init.py
  • pyproject.toml
🚧 Files skipped from review as they are similar to previous changes (5)
  • tests/service/utils/test_list_utils.py
  • tests/service/serialization/test_models.py
  • tests/service/serialization/test_encoders.py
  • tests/service/serialization/test_detection.py
  • src/service/serialization/models.py

Comment thread src/service/data/storage/maria/maria.py Outdated
Comment thread src/service/data/storage/maria/maria.py
- Replace np.save/np.load with allow_pickle=True in MariaDB storage
  with JSON+gzip serialization, completing CWE-502 remediation
- Add datetime.datetime and datetime.date support to JSON encoder/decoder
  (needed for pandas.Timestamp in metadata columns)
- Improve migration done_callback to log exceptions instead of silently
  swallowing them via _on_migration_done static method

Co-Authored-By: Claude Opus 4.6 <[email protected]>
@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 7, 2026

PR image build and manifest generation completed successfully!

📦 PR image: quay.io/trustyai/trustyai-service-python-ci:979bf104bc7783a0be92fda9182ad8e2ad71ba8e

🗂️ CI manifests

devFlags:
  manifests:
    - contextDir: config
      sourcePath: ''
      uri: https://api.github.com/repos/trustyai-explainability/trustyai-service-operator-ci/tarball/service-python-979bf104bc7783a0be92fda9182ad8e2ad71ba8e

Add np.bool_ encoder handling, datetime/date roundtrip tests, fix
parameter naming in pvc.py, and resolve ruff UP017/FBT003 lint errors.

Co-Authored-By: Claude Opus 4.6 <[email protected]>
@trustyai-explainability trustyai-explainability deleted a comment from coderabbitai Bot May 8, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request ok-to-test Proceed with CI testing python Pull requests that update python code

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants