Files
docling-parse/docling_parse/pdf_parser.py
T
Peter W. J. Staar e7ef57fbf6 feat: extend the renderer (#245)
Signed-off-by: Peter Staar <taa@zurich.ibm.com>
2026-04-01 06:48:09 +02:00

1162 lines
39 KiB
Python

"""Parser for PDF files"""
import hashlib
import logging
from io import BytesIO
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union
from docling_core.types.doc.base import BoundingBox, CoordOrigin, ImageRefMode
from docling_core.types.doc.document import ImageRef
from docling_core.types.doc.page import (
BitmapResource,
BoundingRectangle,
ColorRGBA,
Coord2D,
PdfHyperlink,
PdfMetaData,
PdfPageBoundaryType,
PdfPageGeometry,
PdfShape,
PdfTableOfContents,
PdfTextCell,
PdfWidget,
SegmentedPdfPage,
TextCell,
TextDirection,
)
from PIL import Image as PILImage
from pydantic import BaseModel, ConfigDict
from docling_parse.pdf_parsers import DecodePageConfig # type: ignore[import]
from docling_parse.pdf_parsers import PageDecodeResult # type: ignore[import]
from docling_parse.pdf_parsers import PdfPageDecoder # type: ignore[import]
from docling_parse.pdf_parsers import RenderConfig # type: ignore[import]
from docling_parse.pdf_parsers import pdf_parser # type: ignore[import]
from docling_parse.pdf_parsers import threaded_pdf_parser # type: ignore[import]
from docling_parse.pdf_parsers import threaded_pdf_renderer # type: ignore[import]
from docling_parse.pdf_parsers import ( # type: ignore[import]
TIMING_KEY_CREATE_LINE_CELLS,
TIMING_KEY_CREATE_WORD_CELLS,
TIMING_KEY_DECODE_ANNOTS,
TIMING_KEY_DECODE_CONTENTS,
TIMING_KEY_DECODE_DIMENSIONS,
TIMING_KEY_DECODE_DOCUMENT,
TIMING_KEY_DECODE_FONTS,
TIMING_KEY_DECODE_FONTS_TOTAL,
TIMING_KEY_DECODE_GRPHS,
TIMING_KEY_DECODE_GRPHS_TOTAL,
TIMING_KEY_DECODE_PAGE,
TIMING_KEY_DECODE_RESOURCES,
TIMING_KEY_DECODE_XOBJECTS,
TIMING_KEY_DECODE_XOBJECTS_TOTAL,
TIMING_KEY_EXTRACT_ANNOTS_JSON,
TIMING_KEY_EXTRACT_DOC_ANNOTATIONS,
TIMING_KEY_PROCESS_DOCUMENT_FROM_BYTESIO,
TIMING_KEY_PROCESS_DOCUMENT_FROM_FILE,
TIMING_KEY_QPDF_PROCESS,
TIMING_KEY_ROTATE_CONTENTS,
TIMING_KEY_SANITISE_CONTENTS,
TIMING_KEY_SANITIZE_CELLS,
TIMING_KEY_SANITIZE_ORIENTATION,
TIMING_KEY_TO_JSON_PAGE,
TIMING_PREFIX_DECODE_FONT,
TIMING_PREFIX_DECODE_GRPH,
TIMING_PREFIX_DECODE_PAGE,
TIMING_PREFIX_DECODE_XOBJECT,
TIMING_PREFIX_DECODING_PAGE,
get_decode_page_timing_keys,
get_static_timing_keys,
is_static_timing_key,
)
# Configure logging
_log = logging.getLogger(__name__)
class PdfTocEntry(BaseModel):
"""PDF table of contents entry (recursive structure).
Attributes:
title: The text of the TOC entry
level: Nesting level in the hierarchy (0 for top level)
page: Page number this entry points to (optional)
children: Nested TOC entries (optional)
"""
model_config = ConfigDict(extra="allow")
title: str
level: Optional[int] = None
page: Optional[int] = None
children: Optional[List["PdfTocEntry"]] = None
class PdfAnnotations(BaseModel):
"""PDF document annotations including form fields, language, metadata, and table of contents.
Attributes:
form: AcroForm data containing interactive form fields (raw dict structure). None if no forms present.
language: Document language code (e.g., 'en-US', 'fr-FR'). None if not specified.
meta_xml: XMP metadata as XML string. None if no metadata present.
table_of_contents: Document outline/bookmark structure as list of entries. None if no TOC.
"""
model_config = ConfigDict(validate_assignment=True, extra="allow")
form: Optional[Dict[str, Any]] = None
language: Optional[str] = None
meta_xml: Optional[str] = None
table_of_contents: Optional[List[PdfTocEntry]] = None
class Timings(BaseModel):
"""Timing information from PDF page parsing.
Provides detailed timing breakdown of the parsing process, useful for
performance analysis and optimization.
Attributes:
data: Dictionary mapping operation names to elapsed time in seconds (summed).
Common keys include:
- 'decode_page': Total page decoding time
- 'decode_dimensions': Time to parse page dimensions
- 'decode_resources': Time to decode page resources (fonts, etc.)
- 'decode_contents': Time to decode page content streams
- 'decode_annots': Time to decode annotations
- 'create_word_cells': Time to create word cells (if requested)
- 'create_line_cells': Time to create line cells (if requested)
raw_data: Dictionary mapping operation names to list of elapsed times.
This is useful when an operation is repeated multiple times
(e.g., decoding multiple fonts) and you want to see individual timings.
"""
model_config = ConfigDict(validate_assignment=True)
data: Dict[str, float] = {}
raw_data: Dict[str, List[float]] = {}
def total(self) -> float:
"""Get total time across all operations."""
return sum(self.data.values())
def get(self, key: str, default: float = 0.0) -> float:
"""Get timing for a specific operation (summed if repeated)."""
return self.data.get(key, default)
def get_all(self, key: str) -> List[float]:
"""Get all timing values for a specific operation."""
return self.raw_data.get(key, [])
def get_count(self, key: str) -> int:
"""Get the number of times an operation was timed."""
return len(self.raw_data.get(key, []))
def __getitem__(self, key: str) -> float:
return self.data[key]
def keys(self):
"""Get all timing operation names."""
return self.data.keys()
def items(self):
"""Get all timing items as (name, seconds) pairs."""
return self.data.items()
def get_static_timings(self) -> Dict[str, float]:
"""Get only static (constant) timing keys."""
return {k: v for k, v in self.data.items() if is_static_timing_key(k)}
def get_dynamic_timings(self) -> Dict[str, float]:
"""Get only dynamic timing keys."""
return {k: v for k, v in self.data.items() if not is_static_timing_key(k)}
@staticmethod
def static_keys() -> set:
"""Get all static timing key names."""
return get_static_timing_keys()
@staticmethod
def decode_page_keys() -> List[str]:
"""Get timing keys used in decode_page method (in order, excluding global timer)."""
return get_decode_page_timing_keys()
class PdfDocument:
def __init__(
self,
parser: "pdf_parser",
key: str,
boundary_type: PdfPageBoundaryType = PdfPageBoundaryType.CROP_BOX,
):
self._parser: pdf_parser = parser
self._key = key
self._boundary_type = boundary_type
self._pages: Dict[int, SegmentedPdfPage] = {}
self._toc: Optional[PdfTableOfContents] = None
self._meta: Optional[PdfMetaData] = None
self._annotations: Optional[PdfAnnotations] = None
def _default_config(self) -> DecodePageConfig:
config = DecodePageConfig()
config.page_boundary = self._boundary_type.value
config.do_sanitization = False
return config
def is_loaded(self) -> bool:
return self._parser.is_loaded(key=self._key)
def unload(self) -> bool:
self._pages.clear()
if self.is_loaded():
return self._parser.unload_document(self._key)
else:
return False
def unload_pages(self, page_range: tuple[int, int]):
"""unload page in range [page_range[0], page_range[1]["""
for page_no in range(page_range[0], page_range[1]):
if page_no in self._pages:
self._parser.unload_document_page(key=self._key, page=page_no)
del self._pages[page_no]
def number_of_pages(self) -> int:
if self.is_loaded():
return self._parser.number_of_pages(key=self._key)
else:
raise RuntimeError("This document is not loaded.")
def get_meta(self) -> Optional[PdfMetaData]:
if self._meta is not None:
return self._meta
if self.is_loaded():
xml = self._parser.get_meta_xml(key=self._key)
if xml is None:
return self._meta
if isinstance(xml, str):
self._meta = PdfMetaData(xml=xml)
self._meta.initialise()
return self._meta
else:
raise RuntimeError("This document is not loaded.")
def get_table_of_contents(self) -> Optional[PdfTableOfContents]:
if self.is_loaded():
toc = self._parser.get_table_of_contents(key=self._key)
if toc is None:
return self._toc
if self._toc is not None:
return self._toc
self._toc = PdfTableOfContents(text="<root>")
self._toc.children = self._to_table_of_contents(toc=toc)
return self._toc
else:
raise RuntimeError("This document is not loaded.")
def iterate_pages(
self,
*,
config: Optional[DecodePageConfig] = None,
) -> Iterator[Tuple[int, SegmentedPdfPage]]:
if config is None:
config = self._default_config()
for page_no in range(self.number_of_pages()):
yield page_no + 1, self.get_page(
page_no + 1,
config=config,
)
def _to_table_of_contents(self, toc: dict) -> List[PdfTableOfContents]:
result = []
for item in toc:
subtoc = PdfTableOfContents(text=item["title"])
if "children" in item:
subtoc.children = self._to_table_of_contents(toc=item["children"])
result.append(subtoc)
return result
def _to_pdf_toc_entry(self, toc_list: List[Dict]) -> List[PdfTocEntry]:
"""Convert raw TOC dict list to PdfTocEntry objects."""
result = []
for item in toc_list:
entry = PdfTocEntry(
title=item.get("title", ""),
level=item.get("level"),
page=item.get("page"),
)
if "children" in item and item["children"]:
entry.children = self._to_pdf_toc_entry(item["children"])
result.append(entry)
return result
def get_annotations(self) -> Optional[PdfAnnotations]:
"""Get document annotations including form fields, language, metadata, and TOC.
Returns:
Optional[PdfAnnotations]: Annotations object with form, language, meta_xml,
and table_of_contents fields. None if document is not loaded or no annotations.
"""
if self._annotations is not None:
return self._annotations
if self.is_loaded():
annots_dict = self._parser.get_annotations(key=self._key)
if annots_dict is None:
return self._annotations
# Convert table_of_contents list of dicts to PdfTocEntry objects if present
toc_entries = None
if annots_dict.get("table_of_contents"):
toc_entries = self._to_pdf_toc_entry(annots_dict["table_of_contents"])
self._annotations = PdfAnnotations(
form=annots_dict.get("form"),
language=annots_dict.get("language"),
meta_xml=annots_dict.get("meta_xml"),
table_of_contents=toc_entries,
)
return self._annotations
else:
raise RuntimeError("This document is not loaded.")
def get_page(
self,
page_no: int,
*,
config: Optional[DecodePageConfig] = None,
) -> SegmentedPdfPage:
"""Get page using typed API (zero-copy from C++)."""
if config is None:
config = self._default_config()
return self._get_page_typed(page_no, config=config)
def get_page_with_timings(
self,
page_no: int,
*,
config: Optional[DecodePageConfig] = None,
) -> Tuple[SegmentedPdfPage, Timings]:
"""Get page along with timing information.
Similar to get_page() but also returns timing data from the parsing process.
Useful for performance analysis and benchmarking.
Note: This method does NOT use the page cache to ensure fresh timing data.
Args:
page_no: Page number (1-indexed).
config: Page decoding configuration. If None, uses default config.
Returns:
Tuple of (SegmentedPdfPage, Timings) with the parsed page data and timing info.
"""
if config is None:
config = self._default_config()
if not (1 <= page_no <= self.number_of_pages()):
raise ValueError(
f"incorrect page_no: {page_no} for key={self._key} "
f"(min:1, max:{self.number_of_pages()})"
)
return self._get_page_with_timings_typed(page_no, config=config)
def _get_page_with_timings_typed(
self,
page_no: int,
*,
config: DecodePageConfig,
) -> Tuple[SegmentedPdfPage, Timings]:
"""Get page with timings using typed API."""
page_decoder = self._parser.get_page_decoder(
key=self._key,
page=page_no - 1,
config=config,
)
if page_decoder is None:
raise ValueError(f"Failed to decode page {page_no}")
segmented_page = self._to_segmented_page_from_decoder(
page_decoder=page_decoder,
config=config,
)
# Get timings from the page decoder
timings_dict = page_decoder.get_timings()
raw_timings_dict = page_decoder.get_timings_raw()
timings = Timings(data=dict(timings_dict), raw_data=dict(raw_timings_dict))
return segmented_page, timings
def load_all_pages(self, config: Optional[DecodePageConfig] = None):
if config is None:
config = self._default_config()
for page_no in range(1, self.number_of_pages() + 1):
self.get_page(page_no, config=config)
def _to_page_geometry_from_decoder(self, page_dim) -> PdfPageGeometry:
"""Convert typed PdfPageDimension to PdfPageGeometry."""
crop_bbox = page_dim.get_crop_bbox()
media_bbox = page_dim.get_media_bbox()
angle = page_dim.get_angle()
# Use crop_box as default boundary
bbox = crop_bbox
# Build page rectangle as a BoundingRectangle (typed API expects this)
rect = BoundingRectangle(
r_x0=bbox[0],
r_y0=bbox[1],
r_x1=bbox[2],
r_y1=bbox[1],
r_x2=bbox[2],
r_y2=bbox[3],
r_x3=bbox[0],
r_y3=bbox[3],
coord_origin=CoordOrigin.BOTTOMLEFT,
)
art_bbox_obj = BoundingBox(
l=crop_bbox[0],
b=crop_bbox[1],
r=crop_bbox[2],
t=crop_bbox[3],
coord_origin=CoordOrigin.BOTTOMLEFT,
)
media_bbox_obj = BoundingBox(
l=media_bbox[0],
b=media_bbox[1],
r=media_bbox[2],
t=media_bbox[3],
coord_origin=CoordOrigin.BOTTOMLEFT,
)
crop_bbox_obj = BoundingBox(
l=crop_bbox[0],
b=crop_bbox[1],
r=crop_bbox[2],
t=crop_bbox[3],
coord_origin=CoordOrigin.BOTTOMLEFT,
)
return PdfPageGeometry(
angle=angle,
boundary_type=PdfPageBoundaryType(self._boundary_type),
rect=rect,
art_bbox=art_bbox_obj,
media_bbox=media_bbox_obj,
trim_bbox=crop_bbox_obj,
crop_bbox=crop_bbox_obj,
bleed_bbox=crop_bbox_obj,
)
def _to_cells_from_decoder(
self, cells_container
) -> List[Union[PdfTextCell, TextCell]]:
"""Convert typed PdfCells container to list of PdfTextCell objects."""
result: List[Union[PdfTextCell, TextCell]] = []
for ind, cell in enumerate(cells_container):
rect = BoundingRectangle(
r_x0=cell.r_x0,
r_y0=cell.r_y0,
r_x1=cell.r_x1,
r_y1=cell.r_y1,
r_x2=cell.r_x2,
r_y2=cell.r_y2,
r_x3=cell.r_x3,
r_y3=cell.r_y3,
)
result.append(
PdfTextCell(
rect=rect,
text=cell.text,
orig=cell.text,
font_key=cell.font_key,
font_name=cell.font_name,
widget=cell.widget,
text_direction=(
TextDirection.LEFT_TO_RIGHT
if cell.left_to_right
else TextDirection.RIGHT_TO_LEFT
),
index=ind,
rendering_mode=cell.rendering_mode,
)
)
return result
def _to_shapes_from_decoder(self, shapes_container) -> List[PdfShape]:
"""Convert typed PdfShapes container to list of PdfShape objects."""
result: List[PdfShape] = []
for ind, shape in enumerate(shapes_container):
x_coords = shape.get_x()
y_coords = shape.get_y()
indices = shape.get_i()
"""
print(f"{ind}\tlen(indices): {len(indices)} -> {len(x_coords)} -> {shape.get_rgb_filling_ops()}")
if len(indices)>2:
print(indices)
if ind>8:
break
"""
for l in range(0, len(indices), 2):
i0: int = indices[l + 0]
i1: int = indices[l + 1]
points: List[Coord2D] = []
for k in range(i0, i1):
points.append(Coord2D(x_coords[k], y_coords[k]))
rgb_s = shape.get_rgb_stroking_ops()
rgb_f = shape.get_rgb_filling_ops()
pdf_shape = PdfShape(
index=ind,
parent_id=l,
points=points,
has_graphics_state=shape.get_has_graphics_state(),
line_width=shape.get_line_width(),
miter_limit=shape.get_miter_limit(),
line_cap=shape.get_line_cap(),
line_join=shape.get_line_join(),
dash_phase=shape.get_dash_phase(),
dash_array=list(shape.get_dash_array()),
flatness=shape.get_flatness(),
rgb_stroking=ColorRGBA(r=rgb_s[0], g=rgb_s[1], b=rgb_s[2]),
rgb_filling=ColorRGBA(r=rgb_f[0], g=rgb_f[1], b=rgb_f[2]),
)
result.append(pdf_shape)
return result
def _to_widgets_from_decoder(self, widgets_container) -> List[PdfWidget]:
"""Convert typed PdfWidgets container to list of PdfWidget objects."""
result: List[PdfWidget] = []
for ind, widget in enumerate(widgets_container):
rect = BoundingRectangle(
r_x0=widget.x0,
r_y0=widget.y0,
r_x1=widget.x1,
r_y1=widget.y0,
r_x2=widget.x1,
r_y2=widget.y1,
r_x3=widget.x0,
r_y3=widget.y1,
)
result.append(
PdfWidget(
index=ind,
rect=rect,
widget_text=widget.text or None,
widget_description=widget.description or None,
widget_field_name=widget.field_name or None,
widget_field_type=widget.field_type or None,
)
)
return result
def _to_hyperlinks_from_decoder(self, hyperlinks_container) -> List[PdfHyperlink]:
"""Convert typed PdfHyperlinks container to list of PdfHyperlink objects."""
result: List[PdfHyperlink] = []
for ind, hyperlink in enumerate(hyperlinks_container):
rect = BoundingRectangle(
r_x0=hyperlink.x0,
r_y0=hyperlink.y0,
r_x1=hyperlink.x1,
r_y1=hyperlink.y0,
r_x2=hyperlink.x1,
r_y2=hyperlink.y1,
r_x3=hyperlink.x0,
r_y3=hyperlink.y1,
)
result.append(
PdfHyperlink(
index=ind,
rect=rect,
uri=hyperlink.uri or None,
)
)
return result
def _to_bitmap_resources_from_decoder(
self, images_container
) -> List[BitmapResource]:
"""Convert typed PdfImages container to list of BitmapResource objects."""
result: List[BitmapResource] = []
for ind, image in enumerate(images_container):
rect = BoundingRectangle(
r_x0=image.x0,
r_y0=image.y0,
r_x1=image.x1,
r_y1=image.y0,
r_x2=image.x1,
r_y2=image.y1,
r_x3=image.x0,
r_y3=image.y1,
)
image_ref = None
mode = ImageRefMode.PLACEHOLDER
try:
image_bytes = image.get_image_as_bytes()
if image_bytes and len(image_bytes) > 0:
fmt = image.get_image_format()
pil_image: PILImage.Image | None = None
if fmt in ("jpeg", "jp2"):
pil_image = PILImage.open(BytesIO(image_bytes))
elif fmt in ("raw", "jbig2"):
pil_mode = image.get_pil_mode()
w = image.image_width
h = image.image_height
if w > 0 and h > 0:
pil_image = PILImage.frombytes(
pil_mode, (w, h), image_bytes
)
if pil_image is not None:
# Normalize to RGBA for consistent downstream handling
if pil_image.mode != "RGBA":
pil_image = pil_image.convert("RGBA")
# Compute DPI from pixel dimensions and PDF bbox
bbox_width = abs(image.x1 - image.x0)
if bbox_width > 0 and image.image_width > 0:
dpi = int(round(image.image_width * 72.0 / bbox_width))
else:
dpi = 72
image_ref = ImageRef.from_pil(pil_image, dpi=dpi)
mode = ImageRefMode.EMBEDDED
except Exception:
_log.debug(
"Failed to extract image data for bitmap, falling back to placeholder"
)
bitmap = BitmapResource(
index=ind, rect=rect, uri=None, image=image_ref, mode=mode
)
result.append(bitmap)
return result
def _to_segmented_page_from_decoder(
self,
page_decoder,
*,
config: DecodePageConfig,
) -> SegmentedPdfPage:
"""Convert typed PdfPageDecoder to SegmentedPdfPage (zero-copy path)."""
char_cells = self._to_cells_from_decoder(page_decoder.get_char_cells())
shapes = self._to_shapes_from_decoder(page_decoder.get_page_shapes())
widgets = self._to_widgets_from_decoder(page_decoder.get_page_widgets())
hyperlinks = self._to_hyperlinks_from_decoder(
page_decoder.get_page_hyperlinks()
)
bitmap_resources = self._to_bitmap_resources_from_decoder(
page_decoder.get_page_images()
)
segmented_page = SegmentedPdfPage(
dimension=self._to_page_geometry_from_decoder(
page_decoder.get_page_dimension()
),
char_cells=char_cells,
word_cells=[],
textline_cells=[],
has_chars=len(char_cells) > 0,
bitmap_resources=bitmap_resources,
shapes=shapes,
widgets=widgets,
hyperlinks=hyperlinks,
)
if page_decoder.has_word_cells():
segmented_page.word_cells = self._to_cells_from_decoder(
page_decoder.get_word_cells()
)
segmented_page.has_words = len(segmented_page.word_cells) > 0
if page_decoder.has_line_cells():
segmented_page.textline_cells = self._to_cells_from_decoder(
page_decoder.get_line_cells()
)
segmented_page.has_lines = len(segmented_page.textline_cells) > 0
return segmented_page
def _get_page_typed(
self,
page_no: int,
*,
config: DecodePageConfig,
) -> SegmentedPdfPage:
"""Get page using typed API (zero-copy from C++, faster than get_page).
This method uses direct typed bindings to C++ objects, avoiding JSON
serialization/deserialization overhead. Use this for better performance.
Args:
page_no: Page number (1-indexed).
config: Page decoding configuration.
Returns:
SegmentedPdfPage with the parsed page data.
"""
if page_no in self._pages.keys():
return self._pages[page_no]
if 1 <= page_no <= self.number_of_pages():
page_decoder = self._parser.get_page_decoder(
key=self._key,
page=page_no - 1,
config=config,
)
if page_decoder is None:
raise ValueError(f"Failed to decode page {page_no}")
self._pages[page_no] = self._to_segmented_page_from_decoder(
page_decoder=page_decoder,
config=config,
)
return self._pages[page_no]
raise ValueError(
f"incorrect page_no: {page_no} for key={self._key} (min:1, max:{self.number_of_pages()})"
)
class DoclingPdfParser:
def __init__(self, loglevel: str = "fatal"):
"""
Set the log level using a string label.
Parameters:
level (str): Logging level as a string.
One of ['fatal', 'error', 'warning', 'info']
"""
self.parser = pdf_parser(level=loglevel)
def set_loglevel(self, loglevel: str):
"""Set the log level using a string label.
Parameters:
level (str): Logging level as a string.
One of ['fatal', 'error', 'warning', 'info']
)")
"""
self.parser.set_loglevel_with_label(level=loglevel)
def list_loaded_keys(self) -> List[str]:
"""List the keys of the loaded documents.
Returns:
List[str]: A list of keys for the currently loaded documents.
"""
return self.parser.list_loaded_keys()
def load(
self,
path_or_stream: Union[str, Path, BytesIO],
lazy: bool = True,
boundary_type: PdfPageBoundaryType = PdfPageBoundaryType.CROP_BOX,
password: Optional[str] = None,
) -> PdfDocument:
if isinstance(path_or_stream, str):
path_or_stream = Path(path_or_stream)
if isinstance(path_or_stream, Path):
key = f"key={str(path_or_stream)}" # use filepath as internal handle
success = self._load_document(
key=key, filename=str(path_or_stream), password=password
)
elif isinstance(path_or_stream, BytesIO):
hasher = hashlib.sha256(usedforsecurity=False)
while chunk := path_or_stream.read(8192):
hasher.update(chunk)
path_or_stream.seek(0)
hash = hasher.hexdigest()
key = f"key={hash}" # use md5 hash as internal handle
success = self._load_document_from_bytesio(key=key, data=path_or_stream)
if success:
result_doc = PdfDocument(
parser=self.parser, key=key, boundary_type=boundary_type
)
if not lazy: # eagerly parse the pages at init time if desired
result_doc.load_all_pages()
return result_doc
else:
raise RuntimeError(f"Failed to load document with key {key}")
def _load_document(
self, key: str, filename: str, password: Optional[str] = None
) -> bool:
"""Load a document by key and filename.
Parameters:
key (str): The unique key to identify the document.
filename (str): The path to the document file to load.
password (str, optional): Optional password for password-protected files
Returns:
bool: True if the document was successfully loaded, False otherwise.)")
"""
return self.parser.load_document(
key=key, filename=filename.encode("utf8"), password=password
)
def _load_document_from_bytesio(self, key: str, data: BytesIO) -> bool:
"""Load a document by key from a BytesIO-like object.
Parameters:
key (str): The unique key to identify the document.
bytes_io (Any): A BytesIO-like object containing the document data.
Returns:
bool: True if the document was successfully loaded, False otherwise.)")
"""
return self.parser.load_document_from_bytesio(key=key, bytes_io=data)
class ThreadedPdfParserConfig(BaseModel):
"""Configuration for the threaded PDF parser.
Attributes:
loglevel: Logging level ('fatal', 'error', 'warning', 'info').
threads: Number of worker threads for parallel page decoding.
max_concurrent_results: Maximum results buffered before workers pause.
"""
model_config = ConfigDict(arbitrary_types_allowed=True)
loglevel: str = "fatal"
threads: int = 4
max_concurrent_results: int = 32
class DoclingThreadedPdfParser:
"""Threaded PDF parser that decodes pages from multiple documents in parallel.
Usage::
parser_config = ThreadedPdfParserConfig(loglevel="fatal", threads=4, max_concurrent_results=32)
decode_config = DecodePageConfig()
parser = DoclingThreadedPdfParser(parser_config=parser_config, decode_config=decode_config)
for source in sources:
parser.load(source)
while parser.has_tasks():
task = parser.get_task()
if task.success:
page_decoder, timings = task.get()
else:
error_msg = task.error()
"""
def __init__(
self,
parser_config: Optional[ThreadedPdfParserConfig] = None,
decode_config: Optional[DecodePageConfig] = None,
):
if parser_config is None:
parser_config = ThreadedPdfParserConfig()
if decode_config is None:
decode_config = DecodePageConfig()
self._parser = threaded_pdf_parser(
loglevel=parser_config.loglevel,
num_threads=parser_config.threads,
max_concurrent_results=parser_config.max_concurrent_results,
config=decode_config,
)
def load(
self,
path_or_stream: Union[str, Path, BytesIO],
password: Optional[str] = None,
) -> str:
"""Load a document for parallel processing.
Parameters:
path_or_stream: File path or BytesIO object.
password: Optional password for protected files.
Returns:
str: The document key.
"""
if isinstance(path_or_stream, str):
path_or_stream = Path(path_or_stream)
if isinstance(path_or_stream, Path):
key = f"key={str(path_or_stream)}"
success = self._parser.load_document(
key=key, filename=str(path_or_stream).encode("utf8"), password=password
)
elif isinstance(path_or_stream, BytesIO):
hasher = hashlib.sha256(usedforsecurity=False)
while chunk := path_or_stream.read(8192):
hasher.update(chunk)
path_or_stream.seek(0)
hash_val = hasher.hexdigest()
key = f"key={hash_val}"
success = self._parser.load_document_from_bytesio(
key=key, bytes_io=path_or_stream, password=password
)
else:
raise TypeError(
f"Expected str, Path, or BytesIO, got {type(path_or_stream)}"
)
if not success:
raise RuntimeError(f"Failed to load document with key {key}")
return key
def has_tasks(self) -> bool:
"""Check if there are remaining tasks to consume.
On first call, builds the task queue and starts worker threads.
Returns:
bool: True if there are remaining results to consume.
"""
return self._parser.has_tasks()
def get_task(self) -> "PageDecodeResult":
"""Get the next completed page decode result.
Blocks until a result is available.
Returns:
PageDecodeResult: The result with doc_key, page_number, success flag.
Use task.get() to get (PdfPageDecoder, timings) or task.error() for error message.
"""
return self._parser.get_task()
# ---------------------------------------------------------------------------
# Threaded renderer
# ---------------------------------------------------------------------------
class ThreadedPdfRendererConfig(BaseModel):
"""Configuration for the threaded PDF renderer.
Attributes:
loglevel: Logging level ('fatal', 'error', 'warning', 'info').
threads: Number of worker threads for parallel page rendering.
max_concurrent_results: Maximum results buffered before workers pause.
"""
model_config = ConfigDict(arbitrary_types_allowed=True)
loglevel: str = "fatal"
threads: int = 4
max_concurrent_results: int = 32
class PdfPageRenderResult:
"""Wrapper around a raw C++ PageRenderResult providing PIL image conversion.
Attributes:
doc_key: Document key the page belongs to.
page_number: 0-indexed page number.
success: Whether rendering succeeded.
"""
def __init__(self, raw):
self._raw = raw
self.doc_key: str = raw.doc_key
self.page_number: int = raw.page_number
self.success: bool = raw.success
def error(self) -> str:
"""Return the error message if rendering failed, empty string otherwise."""
return self._raw.error_message if not self.success else ""
def get(self) -> Tuple[PdfPageDecoder, Dict[str, float]]:
"""Return (page_decoder, timings) for the rendered page.
Delegates to the underlying PageDecodeResult.get() so that render
results can be used interchangeably with parse results when accessing
the decoded page data.
Raises:
RuntimeError: If the task was not successful.
"""
return self._raw.get()
def get_image(self) -> Optional[PILImage.Image]:
"""Convert rendered pixel data to a PIL RGBA Image.
Returns:
PIL.Image.Image in RGBA mode, or None if rendering failed.
"""
if not self.success:
return None
raw_bytes = self._raw.get_image()
if not raw_bytes:
return None
h, w, _ = self._raw.image_shape
return PILImage.frombuffer("RGBA", (w, h), raw_bytes, "raw", "RGBA", 0, 1)
class DoclingThreadedPdfRenderer:
"""Threaded PDF renderer that decodes and renders pages from multiple documents in parallel.
Each result contains both the decoded page data (accessible via the page_decoder)
and the rendered RGBA image, produced in a single pass.
Usage::
render_config = RenderConfig()
decode_config = DecodePageConfig()
renderer_config = ThreadedPdfRendererConfig(threads=4)
renderer = DoclingThreadedPdfRenderer(
renderer_config=renderer_config,
decode_config=decode_config,
render_config=render_config,
)
for source in sources:
renderer.load(source)
while renderer.has_tasks():
result = renderer.get_task()
if result.success:
image = result.get_image() # PIL RGBA Image
else:
print(result.error())
"""
def __init__(
self,
renderer_config: Optional[ThreadedPdfRendererConfig] = None,
decode_config: Optional[DecodePageConfig] = None,
render_config: Optional[RenderConfig] = None,
):
if renderer_config is None:
renderer_config = ThreadedPdfRendererConfig()
if decode_config is None:
decode_config = DecodePageConfig()
if render_config is None:
render_config = RenderConfig()
self._renderer = threaded_pdf_renderer(
loglevel=renderer_config.loglevel,
num_threads=renderer_config.threads,
max_concurrent_results=renderer_config.max_concurrent_results,
decode_config=decode_config,
render_config=render_config,
)
def load(
self,
path_or_stream: Union[str, Path, BytesIO],
password: Optional[str] = None,
) -> str:
"""Load a document for parallel rendering.
Parameters:
path_or_stream: File path or BytesIO object.
password: Optional password for protected files.
Returns:
str: The document key.
"""
if isinstance(path_or_stream, str):
path_or_stream = Path(path_or_stream)
if isinstance(path_or_stream, Path):
key = f"key={str(path_or_stream)}"
success = self._renderer.load_document(
key=key, filename=str(path_or_stream).encode("utf8"), password=password
)
elif isinstance(path_or_stream, BytesIO):
hasher = hashlib.sha256(usedforsecurity=False)
while chunk := path_or_stream.read(8192):
hasher.update(chunk)
path_or_stream.seek(0)
hash_val = hasher.hexdigest()
key = f"key={hash_val}"
success = self._renderer.load_document_from_bytesio(
key=key, bytes_io=path_or_stream, password=password
)
else:
raise TypeError(
f"Expected str, Path, or BytesIO, got {type(path_or_stream)}"
)
if not success:
raise RuntimeError(f"Failed to load document with key {key}")
return key
def has_tasks(self) -> bool:
"""Check if there are remaining tasks to consume.
On first call, builds the task queue and starts worker threads.
Returns:
bool: True if there are remaining results to consume.
"""
return self._renderer.has_tasks()
def get_task(self) -> PdfPageRenderResult:
"""Get the next completed page render result.
Blocks until a result is available.
Returns:
PdfPageRenderResult: wraps doc_key, page_number, success, and get_image().
"""
return PdfPageRenderResult(self._renderer.get_task())