Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 39 additions & 17 deletions docling_core/transforms/chunker/hybrid_chunker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@
from pydantic import BaseModel, ConfigDict, Field, computed_field, model_validator

from docling_core.transforms.chunker.hierarchical_chunker import (
ChunkingDocSerializer,
ChunkingSerializerProvider,
)
from docling_core.transforms.chunker.line_chunker import LineBasedTokenChunker
from docling_core.transforms.chunker.tokenizer.base import BaseTokenizer
from docling_core.types.doc.document import SectionHeaderItem, TitleItem
from docling_core.transforms.chunker.tokenizer.huggingface import get_default_tokenizer
from docling_core.transforms.serializer.base import BaseDocSerializer
from docling_core.types.doc.document import SectionHeaderItem, TableItem, TitleItem

try:
import semchunk
Expand All @@ -32,20 +36,11 @@
HierarchicalChunker,
)
from docling_core.transforms.serializer.base import (
BaseDocSerializer,
BaseSerializerProvider,
)
from docling_core.types import DoclingDocument


def _get_default_tokenizer():
from docling_core.transforms.chunker.tokenizer.huggingface import (
HuggingFaceTokenizer,
)

return HuggingFaceTokenizer.from_pretrained(model_name="sentence-transformers/all-MiniLM-L6-v2")


class HybridChunker(BaseChunker):
r"""Chunker doing tokenization-aware refinements on top of document layout chunking.

Expand All @@ -54,13 +49,15 @@ class HybridChunker(BaseChunker):
respective pretrained model
max_tokens: The maximum number of tokens per chunk. If not set, limit is
resolved from the tokenizer
repeat_table_headers: Whether to repeat a table header if the table is chunked
merge_peers: Whether to merge undersized chunks sharing same relevant metadata
always_emit_headings: Whether to emit headings even for empty sections
"""

model_config = ConfigDict(arbitrary_types_allowed=True)

tokenizer: BaseTokenizer = Field(default_factory=_get_default_tokenizer)
tokenizer: BaseTokenizer = Field(default_factory=get_default_tokenizer)
repeat_table_header: bool = True
merge_peers: bool = True

serializer_provider: BaseSerializerProvider = ChunkingSerializerProvider()
Expand Down Expand Up @@ -93,7 +90,7 @@ def _patch(cls, data: Any) -> Any:
max_tokens=max_tokens,
)
elif tokenizer is None or isinstance(tokenizer, PreTrainedTokenizerBase):
kwargs = {"tokenizer": tokenizer or _get_default_tokenizer().tokenizer}
kwargs = {"tokenizer": tokenizer or get_default_tokenizer().tokenizer}
if max_tokens is not None:
kwargs["max_tokens"] = max_tokens
data["tokenizer"] = HuggingFaceTokenizer(**kwargs)
Expand Down Expand Up @@ -215,6 +212,7 @@ def _split_by_doc_items(self, doc_chunk: DocChunk, doc_serializer: BaseDocSerial
def _split_using_plain_text(
self,
doc_chunk: DocChunk,
doc_serializer: BaseDocSerializer,
) -> list[DocChunk]:
lengths = self._doc_chunk_length(doc_chunk)
if lengths.total_len <= self.max_tokens:
Expand All @@ -223,7 +221,7 @@ def _split_using_plain_text(
# How much room is there for text after subtracting out the headers and
# captions:
available_length = self.max_tokens - lengths.other_len
sem_chunker = semchunk.chunkerify(self.tokenizer.get_tokenizer(), chunk_size=available_length)

if available_length <= 0:
warnings.warn(
"Headers and captions for this chunk are longer than the total "
Expand All @@ -233,12 +231,36 @@ def _split_using_plain_text(
new_chunk = DocChunk(**doc_chunk.export_json_dict())
new_chunk.meta.captions = None
new_chunk.meta.headings = None
return self._split_using_plain_text(doc_chunk=new_chunk)
text = doc_chunk.text
segments = sem_chunker.chunk(text)
return self._split_using_plain_text(doc_chunk=new_chunk, doc_serializer=doc_serializer)

segments = self.segment(doc_chunk, available_length, doc_serializer)
chunks = [DocChunk(text=s, meta=doc_chunk.meta) for s in segments]
return chunks

def segment(self, doc_chunk: DocChunk, available_length: int, doc_serializer: BaseDocSerializer) -> list[str]:
segments = []
if (
self.repeat_table_header
and isinstance(doc_serializer, ChunkingDocSerializer)
and len(doc_chunk.meta.doc_items) == 1
and isinstance(doc_chunk.meta.doc_items[0], TableItem)
):
header_lines, body_lines = doc_serializer.table_serializer.get_header_and_body_lines(
table_text=doc_chunk.text
)

line_chunker = LineBasedTokenChunker(
tokenizer=self.tokenizer,
max_tokens=available_length,
prefix="\n".join(header_lines),
serializer_provider=self.serializer_provider,
)
segments = line_chunker.chunk_text(lines=body_lines)
else:
sem_chunker = semchunk.chunkerify(self.tokenizer.get_tokenizer(), chunk_size=available_length)
segments = sem_chunker.chunk(doc_chunk.text)
return segments

def _merge_chunks_with_matching_metadata(self, chunks: list[DocChunk]):
output_chunks = []
window_start = 0
Expand Down Expand Up @@ -306,7 +328,7 @@ def chunk(
**kwargs,
) # type: ignore
res = [x for c in res for x in self._split_by_doc_items(c, doc_serializer=my_doc_ser)]
res = [x for c in res for x in self._split_using_plain_text(c)]
res = [x for c in res for x in self._split_using_plain_text(c, doc_serializer=my_doc_ser)]
if self.merge_peers:
res = self._merge_chunks_with_matching_metadata(res)
return iter(res)
222 changes: 222 additions & 0 deletions docling_core/transforms/chunker/line_chunker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
import warnings
from collections.abc import Iterator
from functools import cached_property
from typing import Annotated, Any

from pydantic import ConfigDict, Field, computed_field

from docling_core.transforms.chunker import BaseChunk, BaseChunker, DocChunk, DocMeta
from docling_core.transforms.chunker.hierarchical_chunker import (
ChunkingSerializerProvider,
)
from docling_core.transforms.chunker.tokenizer.base import BaseTokenizer
from docling_core.transforms.chunker.tokenizer.huggingface import get_default_tokenizer
from docling_core.transforms.serializer.base import (
BaseSerializerProvider,
)
from docling_core.types import DoclingDocument


class LineBasedTokenChunker(BaseChunker):
"""Tokenization-aware chunker that preserves line boundaries.

This chunker serializes the document content into text and attempts to keep lines
intact within chunks. It only splits a line if it exceeds the maximum token limit on
its own. This is particularly useful for structured content like tables, code, or logs
where line boundaries are semantically important.
"""

model_config = ConfigDict(arbitrary_types_allowed=True)

tokenizer: Annotated[
BaseTokenizer,
Field(
default_factory=get_default_tokenizer,
description="The tokenizer to use; either instantiated object or name or path of respective pretrained model",
),
]

prefix: Annotated[
str,
Field(
default="",
description="Text that appears at the beginning of each chunk. Useful for adding context like table headers",
),
]

serializer_provider: Annotated[
BaseSerializerProvider,
Field(
default_factory=ChunkingSerializerProvider,
description="Provider for document serialization during chunking",
),
]

@computed_field # type: ignore[misc]
@cached_property
def prefix_len(self) -> int:
"""Cached token count of the prefix, computed during initialization."""
token_count = self.tokenizer.count_tokens(self.prefix)
if token_count >= self.max_tokens:
warnings.warn(
f"Chunks prefix: {self.prefix} is too long for chunk size {self.max_tokens} and will be ignored"
)
return 0
return token_count

@property
def max_tokens(self) -> int:
"""Get maximum number of tokens allowed in a chunk. If not set, limit is resolved from the tokenizer."""
return self.tokenizer.get_max_tokens()

def model_post_init(self, __context) -> None:
# Trigger computation of prefix_len to validate prefix length
_ = self.prefix_len
if self.prefix_len == 0 and self.prefix:
# If prefix_len is 0 but prefix exists, it means prefix was too long
self.prefix = ""

def chunk(self, dl_doc: DoclingDocument, **kwargs: Any) -> Iterator[BaseChunk]:
"""Chunk the provided document using line-based token-aware chunking.

Args:
dl_doc (DoclingDocument): document to chunk

Yields:
Iterator[BaseChunk]: iterator over extracted chunks
"""
my_doc_ser = self.serializer_provider.get_serializer(doc=dl_doc)

# Serialize the entire document to get the text
ser_res = my_doc_ser.serialize()

if not ser_res.text:
return

# Use chunk_text to split the text into chunks
text_chunks = self.chunk_text(lines=ser_res.text.splitlines(True))

# Yield DocChunk objects for each text chunk
for chunk_text in text_chunks:
yield DocChunk(
text=chunk_text,
meta=DocMeta(
doc_items=ser_res.get_unique_doc_items(),
headings=None,
origin=dl_doc.origin,
),
)

def chunk_text(self, lines: list[str]) -> list[str]:
chunks = []
current = self.prefix
current_len = self.prefix_len

for line in lines:
remaining = line

while True:
line_tokens = self.tokenizer.count_tokens(remaining)
available = self.max_tokens - current_len

# If the remaining part fits entirely into current chunk → append and stop
if line_tokens <= available:
current += remaining
current_len += line_tokens
break

# Remaining does NOT fit into current chunk.
# If it CAN fit into a fresh chunk → flush current and start new one.
if line_tokens + self.prefix_len <= self.max_tokens:
chunks.append(current)
current = self.prefix
current_len = self.prefix_len
# loop continues to retry fitting `remaining`
continue

# Remaining is too large even for an empty chunk → split it.
# Split off the first segment that fits into current.
take, remaining = self.split_by_token_limit(remaining, available)

# Add the taken part
current += "\n" + take
current_len += self.tokenizer.count_tokens(take)

# flush the current chunk (full)
chunks.append(current)
current = self.prefix
current_len = self.prefix_len

# end while for this line

# push final chunk if non-empty
if current != self.prefix:
chunks.append(current)

return chunks

def split_by_token_limit(
self,
text: str,
token_limit: int,
prefer_word_boundary: bool = True,
) -> tuple[str, str]:
"""
Split `text` into (head, tail) where `head` has at most `token_limit` tokens,
and `tail` is the remainder. Uses binary search on character indices to minimize
calls to `count_tokens`.

Parameters
----------
text : str
Input string to split.
token_limit: int
Maximum number of tokens allowed in the head.
prefer_word_boundary : bool
If True, try to end the head on a whitespace boundary (without violating
the token limit). If no boundary exists in range, fall back to the
exact max index found by search.

Returns
-------
(head, tail) : tuple[str, str]
`head` contains at most `token_limit` tokens, `tail` is the remaining suffix.
If `token_limit <= 0`, returns ("", text).
"""
if token_limit <= 0 or not text:
return "", text

# if the whole text already fits, return as is.
if self.tokenizer.count_tokens(text) <= token_limit:
return text, ""

# Binary search over character indices [0, len(text)]
lo, hi = 0, len(text)
best_idx: int | None = None

while lo <= hi:
mid = (lo + hi) // 2
head = text[:mid]
tok_count = self.tokenizer.count_tokens(head)

if tok_count <= token_limit:
best_idx = mid # feasible; try to extend
lo = mid + 1
else:
hi = mid - 1

if best_idx is None or best_idx <= 0:
# Even the first character exceeds the limit (e.g., tokenizer behavior).
# Return nothing in head, everything in tail.
return "", text

# Optionally adjust to a previous whitespace boundary without violating the limit
if prefer_word_boundary:
# Search backwards from best_idx to find whitespace; keep within token limit.

last_space_index = text[:best_idx].rfind(" ")
if last_space_index >= 0:
best_idx = last_space_index

head, tail = text[:best_idx], text[best_idx:]
return head, tail
6 changes: 6 additions & 0 deletions docling_core/transforms/chunker/tokenizer/huggingface.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,9 @@ def from_pretrained(
def get_tokenizer(self):
"""Get underlying tokenizer object."""
return self.tokenizer


def get_default_tokenizer():
"""Get default tokenizer instance."""

return HuggingFaceTokenizer.from_pretrained(model_name="sentence-transformers/all-MiniLM-L6-v2")
29 changes: 29 additions & 0 deletions docling_core/transforms/serializer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,35 @@ def serialize(
"""Serializes the passed item."""
...

def get_header_and_body_lines(
self,
*,
table_text: str,
**kwargs: Any,
) -> tuple[list[str], list[str]]:
"""Get header lines and body lines from the table.

Splits a serialized table into header and body sections. This is useful
for repeating headers when a table spans multiple chunks.

Args:
table_text: The serialized table text to split.

Returns:
A tuple of (header_lines, body_lines) where header_lines is a list
of strings representing table headers and body_lines is a list of
strings representing table body rows.

Note:
Default implementation returns empty header lines and all content
in body lines. Subclasses should override to provide format-specific
splitting logic.
"""
# default: empty headers, all content in body
header_lines: list[str] = []
body_lines = [line for line in table_text.split("\n") if line.strip()]
return header_lines, body_lines


class BasePictureSerializer(ABC):
"""Base class for picture item serializers."""
Expand Down
Loading
Loading