mirror of
https://github.com/docling-project/docling-parse.git
synced 2026-05-17 13:10:49 +00:00
db84017ca7
Signed-off-by: Peter Staar <taa@zurich.ibm.com>
1291 lines
43 KiB
Python
1291 lines
43 KiB
Python
"""Parser for PDF files"""
|
|
|
|
import hashlib
|
|
import logging
|
|
from io import BytesIO
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union
|
|
|
|
from docling_core.types.doc.base import BoundingBox, CoordOrigin, ImageRefMode
|
|
from docling_core.types.doc.document import ImageRef
|
|
from docling_core.types.doc.page import (
|
|
BitmapResource,
|
|
BoundingRectangle,
|
|
ColorRGBA,
|
|
Coord2D,
|
|
PdfHyperlink,
|
|
PdfMetaData,
|
|
PdfPageBoundaryType,
|
|
PdfPageGeometry,
|
|
PdfShape,
|
|
PdfTableOfContents,
|
|
PdfTextCell,
|
|
PdfWidget,
|
|
SegmentedPdfPage,
|
|
TextCell,
|
|
TextDirection,
|
|
)
|
|
from PIL import Image as PILImage
|
|
from pydantic import BaseModel, ConfigDict
|
|
|
|
from docling_parse.pdf_parsers import ( # type: ignore[import]
|
|
TIMING_KEY_CREATE_LINE_CELLS,
|
|
TIMING_KEY_CREATE_WORD_CELLS,
|
|
TIMING_KEY_DECODE_ANNOTS,
|
|
TIMING_KEY_DECODE_CONTENTS,
|
|
TIMING_KEY_DECODE_DIMENSIONS,
|
|
TIMING_KEY_DECODE_DOCUMENT,
|
|
TIMING_KEY_DECODE_FONTS,
|
|
TIMING_KEY_DECODE_FONTS_TOTAL,
|
|
TIMING_KEY_DECODE_GRPHS,
|
|
TIMING_KEY_DECODE_GRPHS_TOTAL,
|
|
TIMING_KEY_DECODE_PAGE,
|
|
TIMING_KEY_DECODE_RESOURCES,
|
|
TIMING_KEY_DECODE_XOBJECTS,
|
|
TIMING_KEY_DECODE_XOBJECTS_TOTAL,
|
|
TIMING_KEY_EXTRACT_ANNOTS_JSON,
|
|
TIMING_KEY_EXTRACT_DOC_ANNOTATIONS,
|
|
TIMING_KEY_PROCESS_DOCUMENT_FROM_BYTESIO,
|
|
TIMING_KEY_PROCESS_DOCUMENT_FROM_FILE,
|
|
TIMING_KEY_QPDF_PROCESS,
|
|
TIMING_KEY_ROTATE_CONTENTS,
|
|
TIMING_KEY_SANITISE_CONTENTS,
|
|
TIMING_KEY_SANITIZE_CELLS,
|
|
TIMING_KEY_SANITIZE_ORIENTATION,
|
|
TIMING_KEY_TO_JSON_PAGE,
|
|
TIMING_PREFIX_DECODE_FONT,
|
|
TIMING_PREFIX_DECODE_GRPH,
|
|
TIMING_PREFIX_DECODE_PAGE,
|
|
TIMING_PREFIX_DECODE_XOBJECT,
|
|
TIMING_PREFIX_DECODING_PAGE,
|
|
DecodePageConfig, # type: ignore[import]
|
|
PageDecodeResult, # type: ignore[import]
|
|
PdfPageDecoder, # type: ignore[import]
|
|
RenderConfig, # type: ignore[import]
|
|
get_decode_page_timing_keys,
|
|
get_static_timing_keys,
|
|
is_static_timing_key,
|
|
pdf_parser, # type: ignore[import]
|
|
threaded_pdf_parser, # type: ignore[import]
|
|
threaded_pdf_renderer, # type: ignore[import]
|
|
)
|
|
|
|
# Configure logging
|
|
_log = logging.getLogger(__name__)
|
|
|
|
|
|
class PdfTocEntry(BaseModel):
|
|
"""PDF table of contents entry (recursive structure).
|
|
|
|
Attributes:
|
|
title: The text of the TOC entry
|
|
level: Nesting level in the hierarchy (0 for top level)
|
|
page: Page number this entry points to (optional)
|
|
children: Nested TOC entries (optional)
|
|
"""
|
|
|
|
model_config = ConfigDict(extra="allow")
|
|
|
|
title: str
|
|
level: int | None = None
|
|
page: int | None = None
|
|
children: List["PdfTocEntry"] | None = None
|
|
|
|
|
|
class PdfAnnotations(BaseModel):
|
|
"""PDF document annotations including form fields, language, metadata, and table of contents.
|
|
|
|
Attributes:
|
|
form: AcroForm data containing interactive form fields (raw dict structure). None if no forms present.
|
|
language: Document language code (e.g., 'en-US', 'fr-FR'). None if not specified.
|
|
meta_xml: XMP metadata as XML string. None if no metadata present.
|
|
table_of_contents: Document outline/bookmark structure as list of entries. None if no TOC.
|
|
"""
|
|
|
|
model_config = ConfigDict(validate_assignment=True, extra="allow")
|
|
|
|
form: Dict[str, Any] | None = None
|
|
language: str | None = None
|
|
meta_xml: str | None = None
|
|
table_of_contents: List[PdfTocEntry] | None = None
|
|
|
|
|
|
class Timings(BaseModel):
|
|
"""Timing information from PDF page parsing.
|
|
|
|
Provides detailed timing breakdown of the parsing process, useful for
|
|
performance analysis and optimization.
|
|
|
|
Attributes:
|
|
data: Dictionary mapping operation names to elapsed time in seconds (summed).
|
|
Common keys include:
|
|
- 'decode_page': Total page decoding time
|
|
- 'decode_dimensions': Time to parse page dimensions
|
|
- 'decode_resources': Time to decode page resources (fonts, etc.)
|
|
- 'decode_contents': Time to decode page content streams
|
|
- 'decode_annots': Time to decode annotations
|
|
- 'create_word_cells': Time to create word cells (if requested)
|
|
- 'create_line_cells': Time to create line cells (if requested)
|
|
raw_data: Dictionary mapping operation names to list of elapsed times.
|
|
This is useful when an operation is repeated multiple times
|
|
(e.g., decoding multiple fonts) and you want to see individual timings.
|
|
"""
|
|
|
|
model_config = ConfigDict(validate_assignment=True)
|
|
|
|
data: Dict[str, float] = {}
|
|
raw_data: Dict[str, List[float]] = {}
|
|
|
|
def total(self) -> float:
|
|
"""Get total time across all operations."""
|
|
return sum(self.data.values())
|
|
|
|
def get(self, key: str, default: float = 0.0) -> float:
|
|
"""Get timing for a specific operation (summed if repeated)."""
|
|
return self.data.get(key, default)
|
|
|
|
def get_all(self, key: str) -> List[float]:
|
|
"""Get all timing values for a specific operation."""
|
|
return self.raw_data.get(key, [])
|
|
|
|
def get_count(self, key: str) -> int:
|
|
"""Get the number of times an operation was timed."""
|
|
return len(self.raw_data.get(key, []))
|
|
|
|
def __getitem__(self, key: str) -> float:
|
|
return self.data[key]
|
|
|
|
def keys(self):
|
|
"""Get all timing operation names."""
|
|
return self.data.keys()
|
|
|
|
def items(self):
|
|
"""Get all timing items as (name, seconds) pairs."""
|
|
return self.data.items()
|
|
|
|
def get_static_timings(self) -> Dict[str, float]:
|
|
"""Get only static (constant) timing keys."""
|
|
return {k: v for k, v in self.data.items() if is_static_timing_key(k)}
|
|
|
|
def get_dynamic_timings(self) -> Dict[str, float]:
|
|
"""Get only dynamic timing keys."""
|
|
return {k: v for k, v in self.data.items() if not is_static_timing_key(k)}
|
|
|
|
@staticmethod
|
|
def static_keys() -> set:
|
|
"""Get all static timing key names."""
|
|
return get_static_timing_keys()
|
|
|
|
@staticmethod
|
|
def decode_page_keys() -> List[str]:
|
|
"""Get timing keys used in decode_page method (in order, excluding global timer)."""
|
|
return get_decode_page_timing_keys()
|
|
|
|
|
|
class PdfDocument:
|
|
def __init__(
|
|
self,
|
|
parser: "pdf_parser",
|
|
key: str,
|
|
boundary_type: PdfPageBoundaryType = PdfPageBoundaryType.CROP_BOX,
|
|
):
|
|
self._parser: pdf_parser = parser
|
|
self._key = key
|
|
self._boundary_type = boundary_type
|
|
self._pages: Dict[int, SegmentedPdfPage] = {}
|
|
self._toc: PdfTableOfContents | None = None
|
|
self._meta: PdfMetaData | None = None
|
|
self._annotations: PdfAnnotations | None = None
|
|
|
|
def _default_config(self) -> DecodePageConfig:
|
|
config = DecodePageConfig()
|
|
config.page_boundary = self._boundary_type.value
|
|
config.do_sanitization = False
|
|
return config
|
|
|
|
def is_loaded(self) -> bool:
|
|
return self._parser.is_loaded(key=self._key)
|
|
|
|
def unload(self) -> bool:
|
|
self._pages.clear()
|
|
|
|
if self.is_loaded():
|
|
return self._parser.unload_document(self._key)
|
|
else:
|
|
return False
|
|
|
|
def unload_pages(self, page_range: tuple[int, int]):
|
|
"""unload page in range [page_range[0], page_range[1]["""
|
|
for page_no in range(page_range[0], page_range[1]):
|
|
if page_no < 1:
|
|
_log.error("page_no should always be >=1!")
|
|
|
|
if page_no in self._pages:
|
|
# we are using 0 indexing in the C++ docling-parse!
|
|
page_num = page_no - 1
|
|
self._parser.unload_document_page(key=self._key, page=page_num)
|
|
del self._pages[page_no]
|
|
|
|
def number_of_pages(self) -> int:
|
|
if self.is_loaded():
|
|
return self._parser.number_of_pages(key=self._key)
|
|
else:
|
|
raise RuntimeError("This document is not loaded.")
|
|
|
|
def get_meta(self) -> PdfMetaData | None:
|
|
|
|
if self._meta is not None:
|
|
return self._meta
|
|
|
|
if self.is_loaded():
|
|
xml = self._parser.get_meta_xml(key=self._key)
|
|
|
|
if xml is None:
|
|
return self._meta
|
|
|
|
if isinstance(xml, str):
|
|
self._meta = PdfMetaData(xml=xml)
|
|
self._meta.initialise()
|
|
|
|
return self._meta
|
|
|
|
else:
|
|
raise RuntimeError("This document is not loaded.")
|
|
|
|
def get_table_of_contents(self) -> PdfTableOfContents | None:
|
|
if self.is_loaded():
|
|
toc = self._parser.get_table_of_contents(key=self._key)
|
|
|
|
if toc is None:
|
|
return self._toc
|
|
|
|
if self._toc is not None:
|
|
return self._toc
|
|
|
|
self._toc = PdfTableOfContents(text="<root>")
|
|
self._toc.children = self._to_table_of_contents(toc=toc)
|
|
|
|
return self._toc
|
|
else:
|
|
raise RuntimeError("This document is not loaded.")
|
|
|
|
def iterate_pages(
|
|
self,
|
|
*,
|
|
config: DecodePageConfig | None = None,
|
|
) -> Iterator[Tuple[int, SegmentedPdfPage]]:
|
|
if config is None:
|
|
config = self._default_config()
|
|
for page_no in range(self.number_of_pages()):
|
|
yield (
|
|
page_no + 1,
|
|
self.get_page(
|
|
page_no + 1,
|
|
config=config,
|
|
),
|
|
)
|
|
|
|
def _to_table_of_contents(self, toc: dict) -> List[PdfTableOfContents]:
|
|
|
|
result = []
|
|
for item in toc:
|
|
subtoc = PdfTableOfContents(text=item["title"])
|
|
if "children" in item:
|
|
subtoc.children = self._to_table_of_contents(toc=item["children"])
|
|
result.append(subtoc)
|
|
|
|
return result
|
|
|
|
def _to_pdf_toc_entry(self, toc_list: List[Dict]) -> List[PdfTocEntry]:
|
|
"""Convert raw TOC dict list to PdfTocEntry objects."""
|
|
result = []
|
|
for item in toc_list:
|
|
entry = PdfTocEntry(
|
|
title=item.get("title", ""),
|
|
level=item.get("level"),
|
|
page=item.get("page"),
|
|
)
|
|
if item.get("children"):
|
|
entry.children = self._to_pdf_toc_entry(item["children"])
|
|
result.append(entry)
|
|
return result
|
|
|
|
def get_annotations(self) -> PdfAnnotations | None:
|
|
"""Get document annotations including form fields, language, metadata, and TOC.
|
|
|
|
Returns:
|
|
Optional[PdfAnnotations]: Annotations object with form, language, meta_xml,
|
|
and table_of_contents fields. None if document is not loaded or no annotations.
|
|
"""
|
|
if self._annotations is not None:
|
|
return self._annotations
|
|
|
|
if self.is_loaded():
|
|
annots_dict = self._parser.get_annotations(key=self._key)
|
|
|
|
if annots_dict is None:
|
|
return self._annotations
|
|
|
|
# Convert table_of_contents list of dicts to PdfTocEntry objects if present
|
|
toc_entries = None
|
|
if annots_dict.get("table_of_contents"):
|
|
toc_entries = self._to_pdf_toc_entry(annots_dict["table_of_contents"])
|
|
|
|
self._annotations = PdfAnnotations(
|
|
form=annots_dict.get("form"),
|
|
language=annots_dict.get("language"),
|
|
meta_xml=annots_dict.get("meta_xml"),
|
|
table_of_contents=toc_entries,
|
|
)
|
|
|
|
return self._annotations
|
|
else:
|
|
raise RuntimeError("This document is not loaded.")
|
|
|
|
def get_page(
|
|
self,
|
|
page_no: int,
|
|
*,
|
|
config: DecodePageConfig | None = None,
|
|
) -> SegmentedPdfPage:
|
|
"""Get page using typed API (zero-copy from C++)."""
|
|
if config is None:
|
|
config = self._default_config()
|
|
return self._get_page_typed(page_no, config=config)
|
|
|
|
def get_page_with_timings(
|
|
self,
|
|
page_no: int,
|
|
*,
|
|
config: DecodePageConfig | None = None,
|
|
) -> Tuple[SegmentedPdfPage, Timings]:
|
|
"""Get page along with timing information.
|
|
|
|
Similar to get_page() but also returns timing data from the parsing process.
|
|
Useful for performance analysis and benchmarking.
|
|
|
|
Note: This method does NOT use the page cache to ensure fresh timing data.
|
|
|
|
Args:
|
|
page_no: Page number (1-indexed).
|
|
config: Page decoding configuration. If None, uses default config.
|
|
|
|
Returns:
|
|
Tuple of (SegmentedPdfPage, Timings) with the parsed page data and timing info.
|
|
"""
|
|
if config is None:
|
|
config = self._default_config()
|
|
|
|
if not (1 <= page_no <= self.number_of_pages()):
|
|
raise ValueError(
|
|
f"incorrect page_no: {page_no} for key={self._key} "
|
|
f"(min:1, max:{self.number_of_pages()})"
|
|
)
|
|
|
|
return self._get_page_with_timings_typed(page_no, config=config)
|
|
|
|
def _get_page_with_timings_typed(
|
|
self,
|
|
page_no: int,
|
|
*,
|
|
config: DecodePageConfig,
|
|
) -> Tuple[SegmentedPdfPage, Timings]:
|
|
"""Get page with timings using typed API."""
|
|
page_decoder = self._parser.get_page_decoder(
|
|
key=self._key,
|
|
page=page_no - 1,
|
|
config=config,
|
|
)
|
|
|
|
if page_decoder is None:
|
|
raise ValueError(f"Failed to decode page {page_no}")
|
|
|
|
segmented_page = self._to_segmented_page_from_decoder(
|
|
page_decoder=page_decoder,
|
|
config=config,
|
|
)
|
|
|
|
# Get timings from the page decoder
|
|
timings_dict = page_decoder.get_timings()
|
|
raw_timings_dict = page_decoder.get_timings_raw()
|
|
timings = Timings(data=dict(timings_dict), raw_data=dict(raw_timings_dict))
|
|
|
|
return segmented_page, timings
|
|
|
|
def load_all_pages(self, config: DecodePageConfig | None = None):
|
|
if config is None:
|
|
config = self._default_config()
|
|
for page_no in range(1, self.number_of_pages() + 1):
|
|
self.get_page(page_no, config=config)
|
|
|
|
def _to_page_geometry_from_decoder(self, page_dim) -> PdfPageGeometry:
|
|
"""Convert typed PdfPageDimension to PdfPageGeometry."""
|
|
crop_bbox = page_dim.get_crop_bbox()
|
|
media_bbox = page_dim.get_media_bbox()
|
|
angle = page_dim.get_angle()
|
|
|
|
# Use crop_box as default boundary
|
|
bbox = crop_bbox
|
|
# Build page rectangle as a BoundingRectangle (typed API expects this)
|
|
rect = BoundingRectangle(
|
|
r_x0=bbox[0],
|
|
r_y0=bbox[1],
|
|
r_x1=bbox[2],
|
|
r_y1=bbox[1],
|
|
r_x2=bbox[2],
|
|
r_y2=bbox[3],
|
|
r_x3=bbox[0],
|
|
r_y3=bbox[3],
|
|
coord_origin=CoordOrigin.BOTTOMLEFT,
|
|
)
|
|
art_bbox_obj = BoundingBox(
|
|
l=crop_bbox[0],
|
|
b=crop_bbox[1],
|
|
r=crop_bbox[2],
|
|
t=crop_bbox[3],
|
|
coord_origin=CoordOrigin.BOTTOMLEFT,
|
|
)
|
|
media_bbox_obj = BoundingBox(
|
|
l=media_bbox[0],
|
|
b=media_bbox[1],
|
|
r=media_bbox[2],
|
|
t=media_bbox[3],
|
|
coord_origin=CoordOrigin.BOTTOMLEFT,
|
|
)
|
|
crop_bbox_obj = BoundingBox(
|
|
l=crop_bbox[0],
|
|
b=crop_bbox[1],
|
|
r=crop_bbox[2],
|
|
t=crop_bbox[3],
|
|
coord_origin=CoordOrigin.BOTTOMLEFT,
|
|
)
|
|
|
|
return PdfPageGeometry(
|
|
angle=angle,
|
|
boundary_type=PdfPageBoundaryType(self._boundary_type),
|
|
rect=rect,
|
|
art_bbox=art_bbox_obj,
|
|
media_bbox=media_bbox_obj,
|
|
trim_bbox=crop_bbox_obj,
|
|
crop_bbox=crop_bbox_obj,
|
|
bleed_bbox=crop_bbox_obj,
|
|
)
|
|
|
|
def _to_cells_from_decoder(
|
|
self, cells_container
|
|
) -> List[Union[PdfTextCell, TextCell]]:
|
|
"""Convert typed PdfCells container to list of PdfTextCell objects."""
|
|
result: List[Union[PdfTextCell, TextCell]] = []
|
|
|
|
for ind, cell in enumerate(cells_container):
|
|
rect = BoundingRectangle(
|
|
r_x0=cell.r_x0,
|
|
r_y0=cell.r_y0,
|
|
r_x1=cell.r_x1,
|
|
r_y1=cell.r_y1,
|
|
r_x2=cell.r_x2,
|
|
r_y2=cell.r_y2,
|
|
r_x3=cell.r_x3,
|
|
r_y3=cell.r_y3,
|
|
)
|
|
|
|
result.append(
|
|
PdfTextCell(
|
|
rect=rect,
|
|
text=cell.text,
|
|
orig=cell.text,
|
|
font_key=cell.font_key,
|
|
font_name=cell.font_name,
|
|
widget=cell.widget,
|
|
text_direction=(
|
|
TextDirection.LEFT_TO_RIGHT
|
|
if cell.left_to_right
|
|
else TextDirection.RIGHT_TO_LEFT
|
|
),
|
|
index=ind,
|
|
rendering_mode=cell.rendering_mode,
|
|
)
|
|
)
|
|
|
|
return result
|
|
|
|
def _to_shapes_from_decoder(self, shapes_container) -> List[PdfShape]:
|
|
"""Convert typed PdfShapes container to list of PdfShape objects."""
|
|
result: List[PdfShape] = []
|
|
|
|
for ind, shape in enumerate(shapes_container):
|
|
x_coords = shape.get_x()
|
|
y_coords = shape.get_y()
|
|
indices = shape.get_i()
|
|
|
|
"""
|
|
print(f"{ind}\tlen(indices): {len(indices)} -> {len(x_coords)} -> {shape.get_rgb_filling_ops()}")
|
|
if len(indices)>2:
|
|
print(indices)
|
|
|
|
if ind>8:
|
|
break
|
|
"""
|
|
|
|
for pair_idx in range(0, len(indices), 2):
|
|
i0: int = indices[pair_idx + 0]
|
|
i1: int = indices[pair_idx + 1]
|
|
|
|
points: List[Coord2D] = []
|
|
for k in range(i0, i1):
|
|
points.append(Coord2D(x_coords[k], y_coords[k]))
|
|
|
|
rgb_s = shape.get_rgb_stroking_ops()
|
|
rgb_f = shape.get_rgb_filling_ops()
|
|
|
|
pdf_shape = PdfShape(
|
|
index=ind,
|
|
parent_id=pair_idx,
|
|
points=points,
|
|
has_graphics_state=shape.get_has_graphics_state(),
|
|
line_width=shape.get_line_width(),
|
|
miter_limit=shape.get_miter_limit(),
|
|
line_cap=shape.get_line_cap(),
|
|
line_join=shape.get_line_join(),
|
|
dash_phase=shape.get_dash_phase(),
|
|
dash_array=list(shape.get_dash_array()),
|
|
flatness=shape.get_flatness(),
|
|
rgb_stroking=ColorRGBA(r=rgb_s[0], g=rgb_s[1], b=rgb_s[2]),
|
|
rgb_filling=ColorRGBA(r=rgb_f[0], g=rgb_f[1], b=rgb_f[2]),
|
|
)
|
|
result.append(pdf_shape)
|
|
|
|
return result
|
|
|
|
def _to_widgets_from_decoder(self, widgets_container) -> List[PdfWidget]:
|
|
"""Convert typed PdfWidgets container to list of PdfWidget objects."""
|
|
result: List[PdfWidget] = []
|
|
|
|
for ind, widget in enumerate(widgets_container):
|
|
rect = BoundingRectangle(
|
|
r_x0=widget.x0,
|
|
r_y0=widget.y0,
|
|
r_x1=widget.x1,
|
|
r_y1=widget.y0,
|
|
r_x2=widget.x1,
|
|
r_y2=widget.y1,
|
|
r_x3=widget.x0,
|
|
r_y3=widget.y1,
|
|
)
|
|
result.append(
|
|
PdfWidget(
|
|
index=ind,
|
|
rect=rect,
|
|
widget_text=widget.text or None,
|
|
widget_description=widget.description or None,
|
|
widget_field_name=widget.field_name or None,
|
|
widget_field_type=widget.field_type or None,
|
|
)
|
|
)
|
|
|
|
return result
|
|
|
|
def _to_hyperlinks_from_decoder(self, hyperlinks_container) -> List[PdfHyperlink]:
|
|
"""Convert typed PdfHyperlinks container to list of PdfHyperlink objects."""
|
|
result: List[PdfHyperlink] = []
|
|
|
|
for ind, hyperlink in enumerate(hyperlinks_container):
|
|
rect = BoundingRectangle(
|
|
r_x0=hyperlink.x0,
|
|
r_y0=hyperlink.y0,
|
|
r_x1=hyperlink.x1,
|
|
r_y1=hyperlink.y0,
|
|
r_x2=hyperlink.x1,
|
|
r_y2=hyperlink.y1,
|
|
r_x3=hyperlink.x0,
|
|
r_y3=hyperlink.y1,
|
|
)
|
|
result.append(
|
|
PdfHyperlink(
|
|
index=ind,
|
|
rect=rect,
|
|
uri=hyperlink.uri or None,
|
|
)
|
|
)
|
|
|
|
return result
|
|
|
|
def _to_bitmap_resources_from_decoder(
|
|
self, images_container
|
|
) -> List[BitmapResource]:
|
|
"""Convert typed PdfImages container to list of BitmapResource objects."""
|
|
result: List[BitmapResource] = []
|
|
|
|
for ind, image in enumerate(images_container):
|
|
rect = BoundingRectangle(
|
|
r_x0=image.x0,
|
|
r_y0=image.y0,
|
|
r_x1=image.x1,
|
|
r_y1=image.y0,
|
|
r_x2=image.x1,
|
|
r_y2=image.y1,
|
|
r_x3=image.x0,
|
|
r_y3=image.y1,
|
|
)
|
|
|
|
image_ref = None
|
|
mode = ImageRefMode.PLACEHOLDER
|
|
|
|
try:
|
|
image_bytes = image.get_image_as_bytes()
|
|
|
|
if image_bytes and len(image_bytes) > 0:
|
|
fmt = image.get_image_format()
|
|
pil_image: PILImage.Image | None = None
|
|
|
|
if fmt in ("jpeg", "jp2"):
|
|
pil_image = PILImage.open(BytesIO(image_bytes))
|
|
elif fmt in ("raw", "jbig2"):
|
|
pil_mode = image.get_pil_mode()
|
|
w = image.image_width
|
|
h = image.image_height
|
|
if w > 0 and h > 0:
|
|
pil_image = PILImage.frombytes(
|
|
pil_mode, (w, h), image_bytes
|
|
)
|
|
|
|
if pil_image is not None:
|
|
# Normalize to RGBA for consistent downstream handling
|
|
if pil_image.mode != "RGBA":
|
|
pil_image = pil_image.convert("RGBA")
|
|
|
|
# Compute DPI from pixel dimensions and PDF bbox
|
|
bbox_width = abs(image.x1 - image.x0)
|
|
if bbox_width > 0 and image.image_width > 0:
|
|
dpi = round(image.image_width * 72.0 / bbox_width)
|
|
else:
|
|
dpi = 72
|
|
|
|
image_ref = ImageRef.from_pil(pil_image, dpi=dpi)
|
|
mode = ImageRefMode.EMBEDDED
|
|
|
|
except Exception:
|
|
_log.debug(
|
|
"Failed to extract image data for bitmap, falling back to placeholder"
|
|
)
|
|
|
|
bitmap = BitmapResource(
|
|
index=ind, rect=rect, uri=None, image=image_ref, mode=mode
|
|
)
|
|
result.append(bitmap)
|
|
|
|
return result
|
|
|
|
def _to_segmented_page_from_decoder(
|
|
self,
|
|
page_decoder,
|
|
*,
|
|
config: DecodePageConfig,
|
|
) -> SegmentedPdfPage:
|
|
"""Convert typed PdfPageDecoder to SegmentedPdfPage (zero-copy path)."""
|
|
|
|
char_cells = self._to_cells_from_decoder(page_decoder.get_char_cells())
|
|
shapes = self._to_shapes_from_decoder(page_decoder.get_page_shapes())
|
|
widgets = self._to_widgets_from_decoder(page_decoder.get_page_widgets())
|
|
hyperlinks = self._to_hyperlinks_from_decoder(
|
|
page_decoder.get_page_hyperlinks()
|
|
)
|
|
bitmap_resources = self._to_bitmap_resources_from_decoder(
|
|
page_decoder.get_page_images()
|
|
)
|
|
|
|
segmented_page = SegmentedPdfPage(
|
|
dimension=self._to_page_geometry_from_decoder(
|
|
page_decoder.get_page_dimension()
|
|
),
|
|
char_cells=char_cells,
|
|
word_cells=[],
|
|
textline_cells=[],
|
|
has_chars=len(char_cells) > 0,
|
|
bitmap_resources=bitmap_resources,
|
|
shapes=shapes,
|
|
widgets=widgets,
|
|
hyperlinks=hyperlinks,
|
|
)
|
|
|
|
if page_decoder.has_word_cells():
|
|
segmented_page.word_cells = self._to_cells_from_decoder(
|
|
page_decoder.get_word_cells()
|
|
)
|
|
segmented_page.has_words = len(segmented_page.word_cells) > 0
|
|
|
|
if page_decoder.has_line_cells():
|
|
segmented_page.textline_cells = self._to_cells_from_decoder(
|
|
page_decoder.get_line_cells()
|
|
)
|
|
segmented_page.has_lines = len(segmented_page.textline_cells) > 0
|
|
|
|
return segmented_page
|
|
|
|
def _get_page_typed(
|
|
self,
|
|
page_no: int,
|
|
*,
|
|
config: DecodePageConfig,
|
|
) -> SegmentedPdfPage:
|
|
"""Get page using typed API (zero-copy from C++, faster than get_page).
|
|
|
|
This method uses direct typed bindings to C++ objects, avoiding JSON
|
|
serialization/deserialization overhead. Use this for better performance.
|
|
|
|
Args:
|
|
page_no: Page number (1-indexed).
|
|
config: Page decoding configuration.
|
|
|
|
Returns:
|
|
SegmentedPdfPage with the parsed page data.
|
|
"""
|
|
if page_no in self._pages.keys():
|
|
return self._pages[page_no]
|
|
|
|
if 1 <= page_no <= self.number_of_pages():
|
|
page_decoder = self._parser.get_page_decoder(
|
|
key=self._key,
|
|
page=page_no - 1,
|
|
config=config,
|
|
)
|
|
|
|
if page_decoder is None:
|
|
raise ValueError(f"Failed to decode page {page_no}")
|
|
|
|
self._pages[page_no] = self._to_segmented_page_from_decoder(
|
|
page_decoder=page_decoder,
|
|
config=config,
|
|
)
|
|
return self._pages[page_no]
|
|
|
|
raise ValueError(
|
|
f"incorrect page_no: {page_no} for key={self._key} (min:1, max:{self.number_of_pages()})"
|
|
)
|
|
|
|
|
|
class DoclingPdfParser:
|
|
def __init__(self, loglevel: str = "fatal"):
|
|
"""
|
|
Set the log level using a string label.
|
|
|
|
Parameters:
|
|
level (str): Logging level as a string.
|
|
One of ['fatal', 'error', 'warning', 'info']
|
|
"""
|
|
self.parser = pdf_parser(level=loglevel)
|
|
|
|
def set_loglevel(self, loglevel: str):
|
|
"""Set the log level using a string label.
|
|
|
|
Parameters:
|
|
level (str): Logging level as a string.
|
|
One of ['fatal', 'error', 'warning', 'info']
|
|
)")
|
|
"""
|
|
self.parser.set_loglevel_with_label(level=loglevel)
|
|
|
|
def list_loaded_keys(self) -> List[str]:
|
|
"""List the keys of the loaded documents.
|
|
|
|
Returns:
|
|
List[str]: A list of keys for the currently loaded documents.
|
|
"""
|
|
return self.parser.list_loaded_keys()
|
|
|
|
def load(
|
|
self,
|
|
path_or_stream: Union[str, Path, BytesIO],
|
|
lazy: bool = True,
|
|
boundary_type: PdfPageBoundaryType = PdfPageBoundaryType.CROP_BOX,
|
|
password: str | None = None,
|
|
) -> PdfDocument:
|
|
|
|
if isinstance(path_or_stream, str):
|
|
path_or_stream = Path(path_or_stream)
|
|
|
|
if isinstance(path_or_stream, Path):
|
|
key = f"key={path_or_stream!s}" # use filepath as internal handle
|
|
success = self._load_document(
|
|
key=key, filename=str(path_or_stream), password=password
|
|
)
|
|
|
|
elif isinstance(path_or_stream, BytesIO):
|
|
hasher = hashlib.sha256(usedforsecurity=False)
|
|
|
|
while chunk := path_or_stream.read(8192):
|
|
hasher.update(chunk)
|
|
path_or_stream.seek(0)
|
|
hash = hasher.hexdigest()
|
|
|
|
key = f"key={hash}" # use md5 hash as internal handle
|
|
success = self._load_document_from_bytesio(key=key, data=path_or_stream)
|
|
|
|
if success:
|
|
result_doc = PdfDocument(
|
|
parser=self.parser, key=key, boundary_type=boundary_type
|
|
)
|
|
if not lazy: # eagerly parse the pages at init time if desired
|
|
result_doc.load_all_pages()
|
|
|
|
return result_doc
|
|
else:
|
|
raise RuntimeError(f"Failed to load document with key {key}")
|
|
|
|
def _load_document(
|
|
self, key: str, filename: str, password: str | None = None
|
|
) -> bool:
|
|
"""Load a document by key and filename.
|
|
|
|
Parameters:
|
|
key (str): The unique key to identify the document.
|
|
filename (str): The path to the document file to load.
|
|
password (str, optional): Optional password for password-protected files
|
|
|
|
Returns:
|
|
bool: True if the document was successfully loaded, False otherwise.)")
|
|
"""
|
|
return self.parser.load_document(
|
|
key=key, filename=filename.encode("utf8"), password=password
|
|
)
|
|
|
|
def _load_document_from_bytesio(self, key: str, data: BytesIO) -> bool:
|
|
"""Load a document by key from a BytesIO-like object.
|
|
|
|
Parameters:
|
|
key (str): The unique key to identify the document.
|
|
bytes_io (Any): A BytesIO-like object containing the document data.
|
|
|
|
Returns:
|
|
bool: True if the document was successfully loaded, False otherwise.)")
|
|
"""
|
|
return self.parser.load_document_from_bytesio(key=key, bytes_io=data)
|
|
|
|
|
|
class ThreadedPdfParserConfig(BaseModel):
|
|
"""Configuration for the threaded PDF parser.
|
|
|
|
Attributes:
|
|
loglevel: Logging level ('fatal', 'error', 'warning', 'info').
|
|
threads: Number of worker threads for parallel page decoding.
|
|
max_concurrent_results: Maximum results buffered before workers pause.
|
|
"""
|
|
|
|
model_config = ConfigDict(arbitrary_types_allowed=True)
|
|
|
|
loglevel: str = "fatal"
|
|
threads: int = 4
|
|
max_concurrent_results: int = 32
|
|
|
|
|
|
class DoclingThreadedPdfParser:
|
|
"""Threaded PDF parser that decodes pages from multiple documents in parallel.
|
|
|
|
Usage::
|
|
|
|
parser_config = ThreadedPdfParserConfig(loglevel="fatal", threads=4, max_concurrent_results=32)
|
|
decode_config = DecodePageConfig()
|
|
|
|
parser = DoclingThreadedPdfParser(parser_config=parser_config, decode_config=decode_config)
|
|
|
|
for source in sources:
|
|
parser.load(source)
|
|
|
|
while parser.has_tasks():
|
|
task = parser.get_task()
|
|
|
|
if task.success:
|
|
page_decoder, timings = task.get()
|
|
else:
|
|
error_msg = task.error()
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
parser_config: ThreadedPdfParserConfig | None = None,
|
|
decode_config: DecodePageConfig | None = None,
|
|
):
|
|
if parser_config is None:
|
|
parser_config = ThreadedPdfParserConfig()
|
|
if decode_config is None:
|
|
decode_config = DecodePageConfig()
|
|
|
|
self._parser = threaded_pdf_parser(
|
|
loglevel=parser_config.loglevel,
|
|
num_threads=parser_config.threads,
|
|
max_concurrent_results=parser_config.max_concurrent_results,
|
|
config=decode_config,
|
|
)
|
|
|
|
def load(
|
|
self,
|
|
path_or_stream: Union[str, Path, BytesIO],
|
|
password: str | None = None,
|
|
) -> str:
|
|
"""Load a document for parallel processing.
|
|
|
|
Parameters:
|
|
path_or_stream: File path or BytesIO object.
|
|
password: Optional password for protected files.
|
|
|
|
Returns:
|
|
str: The document key.
|
|
"""
|
|
if isinstance(path_or_stream, str):
|
|
path_or_stream = Path(path_or_stream)
|
|
|
|
if isinstance(path_or_stream, Path):
|
|
key = f"key={path_or_stream!s}"
|
|
success = self._parser.load_document(
|
|
key=key, filename=str(path_or_stream).encode("utf8"), password=password
|
|
)
|
|
elif isinstance(path_or_stream, BytesIO):
|
|
hasher = hashlib.sha256(usedforsecurity=False)
|
|
while chunk := path_or_stream.read(8192):
|
|
hasher.update(chunk)
|
|
path_or_stream.seek(0)
|
|
hash_val = hasher.hexdigest()
|
|
|
|
key = f"key={hash_val}"
|
|
success = self._parser.load_document_from_bytesio(
|
|
key=key, bytes_io=path_or_stream, password=password
|
|
)
|
|
else:
|
|
raise TypeError(
|
|
f"Expected str, Path, or BytesIO, got {type(path_or_stream)}"
|
|
)
|
|
|
|
if not success:
|
|
raise RuntimeError(f"Failed to load document with key {key}")
|
|
|
|
return key
|
|
|
|
def has_tasks(self) -> bool:
|
|
"""Check if there are remaining tasks to consume.
|
|
|
|
On first call, builds the task queue and starts worker threads.
|
|
|
|
Returns:
|
|
bool: True if there are remaining results to consume.
|
|
"""
|
|
return self._parser.has_tasks()
|
|
|
|
def get_task(self) -> "PageDecodeResult":
|
|
"""Get the next completed page decode result.
|
|
|
|
Blocks until a result is available.
|
|
|
|
Returns:
|
|
PageDecodeResult: The result with doc_key, page_number, success flag.
|
|
Use task.get() to get (PdfPageDecoder, timings) or task.error() for error message.
|
|
"""
|
|
return self._parser.get_task()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Threaded renderer
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class ThreadedPdfRendererConfig(BaseModel):
|
|
"""Configuration for the threaded PDF renderer.
|
|
|
|
Attributes:
|
|
loglevel: Logging level ('fatal', 'error', 'warning', 'info').
|
|
threads: Number of worker threads for parallel page rendering.
|
|
max_concurrent_results: Maximum results buffered before workers pause.
|
|
"""
|
|
|
|
model_config = ConfigDict(arbitrary_types_allowed=True)
|
|
|
|
loglevel: str = "fatal"
|
|
threads: int = 4
|
|
max_concurrent_results: int = 32
|
|
|
|
|
|
class PdfPageRenderResult:
|
|
"""Wrapper around a raw C++ PageRenderResult providing PIL image conversion.
|
|
|
|
Attributes:
|
|
doc_key: Document key the page belongs to.
|
|
page_number: 0-indexed page number.
|
|
success: Whether rendering succeeded.
|
|
"""
|
|
|
|
def __init__(self, raw):
|
|
self._raw = raw
|
|
self.doc_key: str = raw.doc_key
|
|
self.page_number: int = raw.page_number
|
|
self.success: bool = raw.success
|
|
|
|
def error(self) -> str:
|
|
"""Return the error message if rendering failed, empty string otherwise."""
|
|
return self._raw.error_message if not self.success else ""
|
|
|
|
def get(self) -> Tuple[PdfPageDecoder, Dict[str, float]]:
|
|
"""Return (page_decoder, timings) for the rendered page.
|
|
|
|
Delegates to the underlying PageDecodeResult.get() so that render
|
|
results can be used interchangeably with parse results when accessing
|
|
the decoded page data.
|
|
|
|
Raises:
|
|
RuntimeError: If the task was not successful.
|
|
"""
|
|
return self._raw.get()
|
|
|
|
def get_image(self) -> PILImage.Image | None:
|
|
"""Convert rendered pixel data to a PIL RGBA Image.
|
|
|
|
Returns:
|
|
PIL.Image.Image in RGBA mode, or None if rendering failed.
|
|
"""
|
|
if not self.success:
|
|
return None
|
|
|
|
raw_bytes = self._raw.get_image()
|
|
if not raw_bytes:
|
|
return None
|
|
|
|
h, w, _ = self._raw.image_shape
|
|
return PILImage.frombuffer("RGBA", (w, h), raw_bytes, "raw", "RGBA", 0, 1)
|
|
|
|
|
|
class DoclingThreadedPdfRenderer:
|
|
"""Threaded PDF renderer that decodes and renders pages from multiple documents in parallel.
|
|
|
|
Each result contains both the decoded page data (accessible via the page_decoder)
|
|
and the rendered RGBA image, produced in a single pass.
|
|
|
|
Usage::
|
|
|
|
render_config = RenderConfig()
|
|
decode_config = DecodePageConfig()
|
|
renderer_config = ThreadedPdfRendererConfig(threads=4)
|
|
|
|
renderer = DoclingThreadedPdfRenderer(
|
|
renderer_config=renderer_config,
|
|
decode_config=decode_config,
|
|
render_config=render_config,
|
|
)
|
|
|
|
for source in sources:
|
|
renderer.load(source)
|
|
|
|
while renderer.has_tasks():
|
|
result = renderer.get_task()
|
|
if result.success:
|
|
image = result.get_image() # PIL RGBA Image
|
|
else:
|
|
print(result.error())
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
renderer_config: ThreadedPdfRendererConfig | None = None,
|
|
decode_config: DecodePageConfig | None = None,
|
|
render_config: RenderConfig | None = None,
|
|
):
|
|
if renderer_config is None:
|
|
renderer_config = ThreadedPdfRendererConfig()
|
|
if decode_config is None:
|
|
decode_config = DecodePageConfig()
|
|
if render_config is None:
|
|
render_config = RenderConfig()
|
|
|
|
self._renderer = threaded_pdf_renderer(
|
|
loglevel=renderer_config.loglevel,
|
|
num_threads=renderer_config.threads,
|
|
max_concurrent_results=renderer_config.max_concurrent_results,
|
|
decode_config=decode_config,
|
|
render_config=render_config,
|
|
)
|
|
|
|
def load(
|
|
self,
|
|
path_or_stream: Union[str, Path, BytesIO],
|
|
password: str | None = None,
|
|
) -> str:
|
|
"""Load a document for parallel rendering.
|
|
|
|
Parameters:
|
|
path_or_stream: File path or BytesIO object.
|
|
password: Optional password for protected files.
|
|
|
|
Returns:
|
|
str: The document key.
|
|
"""
|
|
if isinstance(path_or_stream, str):
|
|
path_or_stream = Path(path_or_stream)
|
|
|
|
if isinstance(path_or_stream, Path):
|
|
key = f"key={path_or_stream!s}"
|
|
success = self._renderer.load_document(
|
|
key=key, filename=str(path_or_stream).encode("utf8"), password=password
|
|
)
|
|
elif isinstance(path_or_stream, BytesIO):
|
|
hasher = hashlib.sha256(usedforsecurity=False)
|
|
while chunk := path_or_stream.read(8192):
|
|
hasher.update(chunk)
|
|
path_or_stream.seek(0)
|
|
hash_val = hasher.hexdigest()
|
|
|
|
key = f"key={hash_val}"
|
|
success = self._renderer.load_document_from_bytesio(
|
|
key=key, bytes_io=path_or_stream, password=password
|
|
)
|
|
else:
|
|
raise TypeError(
|
|
f"Expected str, Path, or BytesIO, got {type(path_or_stream)}"
|
|
)
|
|
|
|
if not success:
|
|
raise RuntimeError(f"Failed to load document with key {key}")
|
|
|
|
return key
|
|
|
|
def has_tasks(self) -> bool:
|
|
"""Check if there are remaining tasks to consume.
|
|
|
|
On first call, builds the task queue and starts worker threads.
|
|
|
|
Returns:
|
|
bool: True if there are remaining results to consume.
|
|
"""
|
|
return self._renderer.has_tasks()
|
|
|
|
def get_task(self) -> PdfPageRenderResult:
|
|
"""Get the next completed page render result.
|
|
|
|
Blocks until a result is available.
|
|
|
|
Returns:
|
|
PdfPageRenderResult: wraps doc_key, page_number, success, and get_image().
|
|
"""
|
|
return PdfPageRenderResult(self._renderer.get_task())
|
|
|
|
|
|
class PdfRenderDocument:
|
|
def __init__(
|
|
self,
|
|
*,
|
|
path_or_stream: Union[Path, bytes],
|
|
parser_doc: PdfDocument,
|
|
renderer_config: ThreadedPdfRendererConfig,
|
|
decode_config: DecodePageConfig,
|
|
render_config: RenderConfig,
|
|
password: str | None = None,
|
|
):
|
|
self._path_or_stream = path_or_stream
|
|
self._parser_doc = parser_doc
|
|
self._renderer_config = renderer_config
|
|
self._decode_config = decode_config
|
|
self._render_config = render_config
|
|
self._password = password
|
|
self._pages: Dict[int, PdfPageRenderResult] = {}
|
|
|
|
def _make_renderer(self) -> "DoclingThreadedPdfRenderer":
|
|
return DoclingThreadedPdfRenderer(
|
|
renderer_config=self._renderer_config,
|
|
decode_config=self._decode_config,
|
|
render_config=self._render_config,
|
|
)
|
|
|
|
def _load_source(self, renderer: "DoclingThreadedPdfRenderer") -> str:
|
|
if isinstance(self._path_or_stream, Path):
|
|
return renderer.load(self._path_or_stream, password=self._password)
|
|
|
|
return renderer.load(BytesIO(self._path_or_stream), password=self._password)
|
|
|
|
def _render_all_pages(self) -> None:
|
|
if len(self._pages) == self.number_of_pages():
|
|
return
|
|
|
|
renderer = self._make_renderer()
|
|
key = self._load_source(renderer)
|
|
|
|
while renderer.has_tasks():
|
|
result = renderer.get_task()
|
|
if result.doc_key != key:
|
|
continue
|
|
if not result.success:
|
|
raise RuntimeError(
|
|
f"Failed to render page {result.page_number + 1}: {result.error()}"
|
|
)
|
|
self._pages[result.page_number + 1] = result
|
|
|
|
def number_of_pages(self) -> int:
|
|
return self._parser_doc.number_of_pages()
|
|
|
|
def get_page(self, page_no: int) -> PdfPageRenderResult:
|
|
if not (1 <= page_no <= self.number_of_pages()):
|
|
raise ValueError(
|
|
f"incorrect page_no: {page_no} (min:1, max:{self.number_of_pages()})"
|
|
)
|
|
|
|
if page_no not in self._pages:
|
|
self._render_all_pages()
|
|
|
|
return self._pages[page_no]
|
|
|
|
def iterate_pages(self) -> Iterator[Tuple[int, PdfPageRenderResult]]:
|
|
self._render_all_pages()
|
|
for page_no in range(1, self.number_of_pages() + 1):
|
|
yield page_no, self._pages[page_no]
|
|
|
|
def unload(self) -> bool:
|
|
self._pages.clear()
|
|
return self._parser_doc.unload()
|
|
|
|
|
|
class DoclingPdfRenderer:
|
|
def __init__(
|
|
self,
|
|
loglevel: str = "fatal",
|
|
decode_config: DecodePageConfig | None = None,
|
|
render_config: RenderConfig | None = None,
|
|
):
|
|
self._loglevel = loglevel
|
|
self._parser = DoclingPdfParser(loglevel=loglevel)
|
|
self._renderer_config = ThreadedPdfRendererConfig(
|
|
loglevel=loglevel,
|
|
threads=1,
|
|
max_concurrent_results=1,
|
|
)
|
|
self._decode_config = decode_config or DecodePageConfig()
|
|
self._render_config = render_config or RenderConfig()
|
|
|
|
def load(
|
|
self,
|
|
path_or_stream: Union[str, Path, BytesIO],
|
|
lazy: bool = True,
|
|
boundary_type: PdfPageBoundaryType = PdfPageBoundaryType.CROP_BOX,
|
|
password: str | None = None,
|
|
) -> PdfRenderDocument:
|
|
parser_doc = self._parser.load(
|
|
path_or_stream=path_or_stream,
|
|
lazy=lazy,
|
|
boundary_type=boundary_type,
|
|
password=password,
|
|
)
|
|
|
|
if isinstance(path_or_stream, str):
|
|
source: Union[Path, bytes] = Path(path_or_stream)
|
|
elif isinstance(path_or_stream, Path):
|
|
source = path_or_stream
|
|
elif isinstance(path_or_stream, BytesIO):
|
|
source = path_or_stream.getvalue()
|
|
else:
|
|
raise TypeError(
|
|
f"Expected str, Path, or BytesIO, got {type(path_or_stream)}"
|
|
)
|
|
|
|
return PdfRenderDocument(
|
|
path_or_stream=source,
|
|
parser_doc=parser_doc,
|
|
renderer_config=self._renderer_config,
|
|
decode_config=self._decode_config,
|
|
render_config=self._render_config,
|
|
password=password,
|
|
)
|