Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 216 additions & 0 deletions examples/example_notebook_reference_highlight.ipynb

Large diffs are not rendered by default.

153 changes: 105 additions & 48 deletions lexoid/core/parse_type/static_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import tempfile
from functools import wraps
from time import time
from typing import Dict, List
from typing import Dict, List, Tuple

import pandas as pd
import pdfplumber
Expand Down Expand Up @@ -173,34 +173,6 @@ def parse_with_pdfminer(path: str, **kwargs) -> Dict:
}


def process_table(table) -> str:
"""
Convert a table to markdown format.
"""
# Extract table data
table_data = table.extract()
if not table_data or not table_data[0]: # Check if table is empty
return ""

# Convert to DataFrame and handle empty cells
df = pd.DataFrame(table_data)
df.replace("", pd.NA, inplace=True)
df = df.dropna(how="all", axis=0)
df = df.dropna(how="all", axis=1)
df = df.fillna("")
if len(df) == 0:
return ""

# Use first row as header and clean it up
df.columns = df.iloc[0]
df = df.drop(df.index[0])
df.replace(r"\n", "<br>", regex=True, inplace=True)

# Convert to markdown with some formatting options
markdown_table = df.to_markdown(index=False, tablefmt="pipe")
return f"\n{markdown_table}\n\n"


def embed_links_in_text(page, text, links):
"""
Embed hyperlinks inline within the text, matching their position based on rectangles.
Expand Down Expand Up @@ -266,7 +238,9 @@ def embed_email_links(text: str) -> str:
return email_pattern.sub(lambda match: f"<{match.group('email')}>", text)


def process_pdf_page_with_pdfplumber(page, uri_rects, **kwargs):
def process_pdf_page_with_pdfplumber(
page, uri_rects, **kwargs
) -> Tuple[str, List[Tuple[str, Tuple[float, float, float, float]]]]:
"""
Process a single page's content and return formatted markdown text.
"""
Expand All @@ -277,6 +251,10 @@ def process_pdf_page_with_pdfplumber(page, uri_rects, **kwargs):
x_tolerance = kwargs.get("x_tolerance", 1)
y_tolerance = kwargs.get("y_tolerance", 5)
next_h_line_idx = 0
word_bboxes = []

page_width = float(page.width)
page_height = float(page.height)

# First detect horizontal lines that could be markdown rules
horizontal_lines = []
Expand All @@ -302,6 +280,57 @@ def process_pdf_page_with_pdfplumber(page, uri_rects, **kwargs):
snap_x_tolerance = kwargs.get("snap_x_tolerance", 10)
snap_y_tolerance = kwargs.get("snap_y_tolerance", 0)

def process_table(table):
table_data = table.extract()
if not table_data or not table_data[0]:
return "", []

df = pd.DataFrame(table_data)
df.replace("", pd.NA, inplace=True)
df = df.dropna(how="all", axis=0).dropna(how="all", axis=1)
df = df.fillna("")
if len(df) == 0:
return "", []

df.columns = df.iloc[0]
df = df.drop(df.index[0])
df.replace(r"\n", "<br>", regex=True, inplace=True)

markdown_table = df.to_markdown(index=False, tablefmt="pipe")
markdown_table = f"\n{markdown_table}\n\n"

words_on_page = page.extract_words(
extra_attrs=["top", "bottom", "fontname", "size"],
)

def intersects(word_bbox, cell_bbox):
wx0, wtop, wx1, wbot = word_bbox
cx0, ctop, cx1, cbot = cell_bbox
x_overlap = (wx0 <= cx1) and (wx1 >= cx0)
y_overlap = (wtop <= cbot) and (wbot >= ctop)
return x_overlap and y_overlap

table_bboxes = []
for cell in table.cells: # cell is a tuple: (x0, top, x1, bottom)
cx0, ctop, cx1, cbot = cell
cell_bbox = (cx0, ctop, cx1, cbot)

for w in words_on_page:
word_bbox = (w["x0"], w["top"], w["x1"], w["bottom"])
if intersects(word_bbox, cell_bbox):
text = (w.get("text") or "").strip()
if not text:
continue
norm_bbox = (
w["x0"] / page_width,
w["top"] / page_height,
w["x1"] / page_width,
w["bottom"] / page_height,
)
table_bboxes.append((text, norm_bbox))

return markdown_table, table_bboxes

tables = page.find_tables(
table_settings={
"vertical_strategy": vertical_strategy,
Expand All @@ -310,11 +339,14 @@ def process_pdf_page_with_pdfplumber(page, uri_rects, **kwargs):
"snap_y_tolerance": snap_y_tolerance,
}
)
table_zones = [(table.bbox, process_table(table)) for table in tables]
table_zones = []
for table in tables:
table_md, table_bboxes = process_table(table)
table_zones.append((table.bbox, table_md, table_bboxes))

# Create a filtered page excluding table areas
filtered_page = page
for table_bbox, _ in table_zones:
for table_bbox, _, _ in table_zones:
filtered_page = filtered_page.filter(
lambda obj: get_bbox_overlap(obj_to_bbox(obj), table_bbox) is None
)
Expand Down Expand Up @@ -395,6 +427,16 @@ def apply_markdown_formatting(text, formatting):
text = f"*{text}*"
return text

def normalize_bbox(bbox):
"""Convert PDF bbox to normalized coordinates (0-1)."""
x0, top, x1, bottom = bbox
return (
x0 / page_width,
top / page_height,
x1 / page_width,
bottom / page_height,
)

def format_paragraph(text_elements):
"""
Format a paragraph with styling applied to individual words.
Expand All @@ -415,12 +457,12 @@ def format_paragraph(text_elements):
formatting = get_text_formatting(element)

if formatting.get("monospace", False):
# Wrap monospace words with backticks
formatted_words.append(f"`{text}`")
formatted_word = f"`{text}`"
else:
all_monospace = False
# Apply other markdown formatting
formatted_words.append(apply_markdown_formatting(text, formatting))
formatted_word = apply_markdown_formatting(text, formatting)
formatted_words.append(formatted_word)
word_bboxes.append((formatted_word, normalize_bbox(obj_to_bbox(element))))

# If all words are monospace, format as a code block
if all_monospace:
Expand Down Expand Up @@ -457,14 +499,15 @@ def detect_heading_level(font_size, body_font_size):
return None

tables = []
for bbox, table_md in table_zones:
for bbox, table_md, table_bboxes in table_zones:
tables.append(
(
"table",
{
"top": bbox[1],
"bottom": bbox[3],
"content": table_md,
"bboxes": table_bboxes,
},
)
)
Expand Down Expand Up @@ -512,6 +555,7 @@ def detect_heading_level(font_size, body_font_size):
current_paragraph = []
# Add the table
markdown_content.append(element["content"])
word_bboxes.extend(element["bboxes"])
last_y = element["bottom"]
elif element_type == "horizontal_line":
while (next_h_line_idx < len(horizontal_lines)) and (
Expand Down Expand Up @@ -548,8 +592,9 @@ def detect_heading_level(font_size, body_font_size):
markdown_content.append(format_paragraph(current_paragraph))
current_paragraph = []

indent_level = detect_indentation_level(word, base_left)
current_paragraph.append(("indent", indent_level))
if heading_level is None:
indent_level = detect_indentation_level(word, base_left)
current_paragraph.append(("indent", indent_level))

# Add word to appropriate collection
if heading_level:
Expand Down Expand Up @@ -600,14 +645,19 @@ def detect_heading_level(font_size, body_font_size):
.replace("\n```\n\n```", "")
)

return content
return content, word_bboxes


def process_pdf_with_pdfplumber(path: str, **kwargs) -> List[str]:
def process_pdf_with_pdfplumber(
path: str, **kwargs
) -> List[Tuple[str, List[Tuple[str, Tuple[float, float, float, float]]]]]:
"""
Process PDF and return a list of markdown-formatted strings, one per page.
Process PDF and return a list of (markdown, word_bboxes) per page.

Returns: List[Tuple[str, List[Tuple[str, Tuple[float, float, float, float]]]]]
Each page returns a (markdown_text, [(word, (x0, top, x1, bottom))]) tuple for both content and bounding box mapping.
"""
page_texts = []
page_data = []

with tempfile.TemporaryDirectory() as temp_dir:
paths = split_pdf(path, temp_dir, pages_per_split=1)
Expand All @@ -616,12 +666,12 @@ def process_pdf_with_pdfplumber(path: str, **kwargs) -> List[str]:
uri_rects = get_uri_rect(split_path)
with pdfplumber.open(split_path) as pdf:
for page in pdf.pages:
page_content = process_pdf_page_with_pdfplumber(
page_content, word_bboxes = process_pdf_page_with_pdfplumber(
page, uri_rects, **kwargs
)
page_texts.append(page_content.strip())
page_data.append((page_content.strip(), word_bboxes))

return page_texts
return page_data


def parse_with_pdfplumber(path: str, **kwargs) -> Dict:
Expand All @@ -631,9 +681,16 @@ def parse_with_pdfplumber(path: str, **kwargs) -> Dict:
Returns:
Dict: Dictionary containing parsed document data
"""
page_texts = process_pdf_with_pdfplumber(path)
page_data = process_pdf_with_pdfplumber(path)
page_texts = [p[0] for p in page_data]
page_bboxes = [p[1] for p in page_data]

segments = [
{"metadata": {"page": kwargs["start"] + page_num}, "content": page_text}
{
"metadata": {"page": kwargs["start"] + page_num},
"content": page_text,
"bboxes": page_bboxes[page_num - 1],
}
for page_num, page_text in enumerate(page_texts, start=1)
]

Expand Down
Loading