Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 84 additions & 12 deletions docling/pipeline/standard_pdf_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,14 @@
from typing import Any, Callable, Iterable, List, Optional, Sequence, Tuple, cast

import numpy as np
from docling_core.types.doc import DocItem, ImageRef, PictureItem, TableItem
from docling_core.types.doc import (
DocItem,
ImageRef,
PageItem,
PictureItem,
Size,
TableItem,
)

from docling.backend.abstract_backend import AbstractDocumentBackend
from docling.backend.pdf_backend import PdfDocumentBackend
Expand Down Expand Up @@ -75,11 +82,11 @@
class ThreadedItem:
"""Envelope that travels between pipeline stages."""

payload: Optional[Page]
payload: Page | None
run_id: int # Unique per *execute* call, monotonic across pipeline instance
page_no: int
conv_res: ConversionResult
error: Optional[Exception] = None
error: Exception | None = None
is_failed: bool = False


Expand Down Expand Up @@ -122,7 +129,7 @@ def __init__(self, max_size: int) -> None:
self._closed = False

# ---------------------------------------------------------------- put()
def put(self, item: ThreadedItem, timeout: Optional[float] | None = None) -> bool:
def put(self, item: ThreadedItem, timeout: float | None = None) -> bool:
"""Block until queue accepts *item* or is closed. Returns *False* if closed."""
with self._not_full:
if self._closed:
Expand All @@ -143,9 +150,7 @@ def put(self, item: ThreadedItem, timeout: Optional[float] | None = None) -> boo
return True

# ------------------------------------------------------------ get_batch()
def get_batch(
self, size: int, timeout: Optional[float] | None = None
) -> List[ThreadedItem]:
def get_batch(self, size: int, timeout: float | None = None) -> List[ThreadedItem]:
"""Return up to *size* items. Blocks until ≥1 item present or queue closed/timeout."""
with self._not_empty:
start = time.monotonic()
Expand Down Expand Up @@ -188,16 +193,16 @@ def __init__(
batch_size: int,
batch_timeout: float,
queue_max_size: int,
postprocess: Optional[Callable[[ThreadedItem], None]] = None,
timed_out_run_ids: Optional[set[int]] = None,
postprocess: Callable[[ThreadedItem], None] | None = None,
timed_out_run_ids: set[int] | None = None,
) -> None:
self.name = name
self.model = model
self.batch_size = batch_size
self.batch_timeout = batch_timeout
self.input_queue = ThreadedQueue(queue_max_size)
self._outputs: list[ThreadedQueue] = []
self._thread: Optional[threading.Thread] = None
self._thread: threading.Thread | None = None
self._running = False
self._postprocess = postprocess
self._timed_out_run_ids = (
Expand Down Expand Up @@ -328,7 +333,7 @@ def __init__(
batch_timeout: float,
queue_max_size: int,
model: Any,
timed_out_run_ids: Optional[set[int]] = None,
timed_out_run_ids: set[int] | None = None,
) -> None:
super().__init__(
name="preprocess",
Expand Down Expand Up @@ -502,7 +507,7 @@ def _init_models(self) -> None:
)

# ---------------------------------------------------------------- helpers
def _make_ocr_model(self, art_path: Optional[Path]) -> Any:
def _make_ocr_model(self, art_path: Path | None) -> Any:
factory = get_ocr_factory(
allow_external_plugins=self.pipeline_options.allow_external_plugins
)
Expand Down Expand Up @@ -839,8 +844,75 @@ def _assemble_document(self, conv_res: ConversionResult) -> ConversionResult:
)
)

# Add failed pages to DoclingDocument.pages to preserve page numbering
# This ensures page break markers are generated for skipped/failed pages
self._add_failed_pages_to_document(conv_res)

return conv_res

def _add_failed_pages_to_document(self, conv_res: ConversionResult) -> None:
"""Add failed/skipped pages to DoclingDocument.pages.

This ensures that page break markers are correctly generated for documents
where some pages failed to parse. Without this, export functions would not
know about the missing pages and would generate incorrect page break counts.

The failed pages are added with their size information (if available from
the backend) but without any content.
"""
if conv_res.document is None:
return

# Determine which pages were expected to be processed
start_page, end_page = conv_res.input.limits.page_range
expected_page_nos = set(
range(
max(1, start_page),
min(conv_res.input.page_count, end_page) + 1,
)
)

# Find pages that are missing from the document
existing_page_nos = set(conv_res.document.pages.keys())
missing_page_nos = expected_page_nos - existing_page_nos

if not missing_page_nos:
return

# Try to get size information from the backend for missing pages
backend = conv_res.input._backend
for page_no in sorted(missing_page_nos):
try:
# Attempt to get page size from backend
if isinstance(backend, PdfDocumentBackend):
page_backend = backend.load_page(page_no - 1)
try:
if page_backend.is_valid():
size = page_backend.get_size()
else:
# Use a default size if page backend is invalid
size = Size(width=0.0, height=0.0)
finally:
page_backend.unload()
else:
size = Size(width=0.0, height=0.0)
except Exception:
# If we can't get size, use default
size = Size(width=0.0, height=0.0)

# Add the failed page to the document's pages dict
conv_res.document.pages[page_no] = PageItem(
page_no=page_no,
size=size,
image=None,
)

_log.debug(
"Added %d failed/skipped pages to document: %s",
len(missing_page_nos),
sorted(missing_page_nos),
)

# ---------------------------------------------------------------- misc
@classmethod
def get_default_options(cls) -> ThreadedPdfPipelineOptions:
Expand Down
Loading