mirror of
https://github.com/docling-project/docling-parse.git
synced 2026-05-17 13:10:49 +00:00
b066b26215
Signed-off-by: Christoph Auer <cau@zurich.ibm.com>
312 lines
10 KiB
Python
312 lines
10 KiB
Python
#!/usr/bin/env python
|
|
"""Tests for the threaded PDF parser."""
|
|
|
|
import glob
|
|
import os
|
|
|
|
import pytest
|
|
from docling_core.types.doc.page import PdfPageBoundaryType, SegmentedPdfPage
|
|
|
|
from docling_parse import pdf_parsers
|
|
from docling_parse.pdf_parser import (
|
|
DecodePageConfig,
|
|
DoclingPdfParser,
|
|
DoclingThreadedPdfParser,
|
|
ThreadedPdfParserConfig,
|
|
)
|
|
from tests.test_parse import (
|
|
GROUNDTRUTH_FOLDER,
|
|
REGRESSION_FOLDER,
|
|
verify_SegmentedPdfPage,
|
|
)
|
|
|
|
SAMPLE_PDF = "docs/dln-v1.pdf"
|
|
LARGE_SAMPLE_PDF = "docs/PDF32000_2008.pdf"
|
|
|
|
|
|
def _make_decode_config() -> DecodePageConfig:
|
|
config = DecodePageConfig()
|
|
config.page_boundary = "crop_box"
|
|
config.do_sanitization = False
|
|
config.keep_glyphs = True
|
|
config.keep_qpdf_warnings = False
|
|
return config
|
|
|
|
|
|
def test_threaded_raw_pybind_types_are_internal():
|
|
assert not hasattr(pdf_parsers, "PageDecodeResult")
|
|
assert not hasattr(pdf_parsers, "threaded_pdf_parser")
|
|
assert not hasattr(pdf_parsers, "PageRenderResult")
|
|
assert not hasattr(pdf_parsers, "threaded_pdf_renderer")
|
|
|
|
|
|
def test_threaded_reference_documents_from_filenames():
|
|
"""Load all regression PDFs, decode all pages in parallel, and verify against groundtruth."""
|
|
pdf_docs = sorted(glob.glob(REGRESSION_FOLDER))
|
|
assert len(pdf_docs) > 0, "len(pdf_docs)==0 -> nothing to test"
|
|
|
|
parser = DoclingThreadedPdfParser(
|
|
parser_config=ThreadedPdfParserConfig(
|
|
loglevel="fatal",
|
|
threads=4,
|
|
max_concurrent_results=32,
|
|
boundary_type=PdfPageBoundaryType.CROP_BOX,
|
|
),
|
|
decode_config=_make_decode_config(),
|
|
)
|
|
|
|
doc_keys = {pdf_doc_path: parser.load(pdf_doc_path) for pdf_doc_path in pdf_docs}
|
|
|
|
page_restrictions = {
|
|
"deep-mediabox-inheritance.pdf": [2],
|
|
"font_06.pdf": [1],
|
|
"font_07.pdf": [1],
|
|
"font_08.pdf": [1],
|
|
"font_09.pdf": [1],
|
|
"font_10.pdf": [1],
|
|
}
|
|
|
|
results: dict[str, dict[int, SegmentedPdfPage]] = {}
|
|
for result in parser.iterate_results():
|
|
assert result.doc_key != "", "doc_key should not be empty"
|
|
if result.success:
|
|
results.setdefault(result.doc_key, {})[result.page_number] = (
|
|
result.get_page()
|
|
)
|
|
else:
|
|
print(
|
|
f"Warning: task failed for {result.doc_key} page {result.page_number}: {result.error_message}"
|
|
)
|
|
|
|
for pdf_doc_path in pdf_docs:
|
|
key = doc_keys[pdf_doc_path]
|
|
assert key in results, f"No results found for {pdf_doc_path}"
|
|
|
|
rname = os.path.basename(pdf_doc_path)
|
|
|
|
for page_no, pred_page in sorted(results[key].items()):
|
|
if rname in page_restrictions and page_no not in page_restrictions[rname]:
|
|
continue
|
|
|
|
fname = os.path.join(
|
|
GROUNDTRUTH_FOLDER, rname + f".page_no_{page_no}.py.json"
|
|
)
|
|
|
|
if os.path.exists(fname):
|
|
true_page = SegmentedPdfPage.load_from_json(fname)
|
|
verify_SegmentedPdfPage(true_page, pred_page, filename=fname)
|
|
|
|
|
|
def test_threaded_single_document():
|
|
"""Test threaded parsing with a single document."""
|
|
filename = SAMPLE_PDF
|
|
|
|
parser = DoclingThreadedPdfParser(
|
|
parser_config=ThreadedPdfParserConfig(
|
|
loglevel="fatal",
|
|
threads=2,
|
|
max_concurrent_results=4,
|
|
boundary_type=PdfPageBoundaryType.CROP_BOX,
|
|
),
|
|
decode_config=_make_decode_config(),
|
|
)
|
|
|
|
key = parser.load(filename)
|
|
assert parser.page_count(key) > 0
|
|
|
|
count = 0
|
|
for result in parser.iterate_results():
|
|
assert result.success, (
|
|
f"Failed to decode page {result.page_number}: {result.error_message}"
|
|
)
|
|
assert result.doc_key == key
|
|
assert result.page_width > 0
|
|
assert result.page_height > 0
|
|
assert result.get_timings().total() > 0
|
|
count += 1
|
|
|
|
assert count == parser.page_count(key)
|
|
|
|
|
|
def test_threaded_results_match_sequential():
|
|
"""Verify threaded results match sequential results for the same documents."""
|
|
filenames = [SAMPLE_PDF]
|
|
decode_config = _make_decode_config()
|
|
|
|
seq_parser = DoclingPdfParser(loglevel="fatal")
|
|
sequential_pages: dict[str, dict[int, SegmentedPdfPage]] = {}
|
|
for filename in filenames:
|
|
pdf_doc = seq_parser.load(
|
|
path_or_stream=filename,
|
|
boundary_type=PdfPageBoundaryType.CROP_BOX,
|
|
lazy=True,
|
|
)
|
|
key = f"key={filename}"
|
|
sequential_pages[key] = {}
|
|
for page_no, page in pdf_doc.iterate_pages(config=decode_config):
|
|
sequential_pages[key][page_no] = page
|
|
|
|
threaded_parser = DoclingThreadedPdfParser(
|
|
parser_config=ThreadedPdfParserConfig(
|
|
loglevel="fatal",
|
|
threads=2,
|
|
max_concurrent_results=4,
|
|
boundary_type=PdfPageBoundaryType.CROP_BOX,
|
|
),
|
|
decode_config=decode_config,
|
|
)
|
|
for filename in filenames:
|
|
threaded_parser.load(filename)
|
|
|
|
threaded_pages: dict[str, dict[int, SegmentedPdfPage]] = {}
|
|
for result in threaded_parser.iterate_results():
|
|
assert result.success, f"Failed: {result.error_message}"
|
|
threaded_pages.setdefault(result.doc_key, {})[result.page_number] = (
|
|
result.get_page()
|
|
)
|
|
|
|
for key in sequential_pages:
|
|
assert key in threaded_pages, f"Missing key {key} in threaded results"
|
|
for page_no in sequential_pages[key]:
|
|
assert page_no in threaded_pages[key], f"Missing page {page_no} for {key}"
|
|
|
|
seq_page = sequential_pages[key][page_no]
|
|
thr_page = threaded_pages[key][page_no]
|
|
|
|
assert len(seq_page.char_cells) == len(thr_page.char_cells), (
|
|
f"char_cells count mismatch for {key} page {page_no}"
|
|
)
|
|
assert len(seq_page.word_cells) == len(thr_page.word_cells), (
|
|
f"word_cells count mismatch for {key} page {page_no}"
|
|
)
|
|
assert len(seq_page.textline_cells) == len(thr_page.textline_cells), (
|
|
f"textline_cells count mismatch for {key} page {page_no}"
|
|
)
|
|
assert len(seq_page.shapes) == len(thr_page.shapes), (
|
|
f"shapes count mismatch for {key} page {page_no}"
|
|
)
|
|
|
|
|
|
def test_threaded_backpressure():
|
|
"""Test that backpressure works with max_concurrent_results=1."""
|
|
filename = LARGE_SAMPLE_PDF
|
|
|
|
parser = DoclingThreadedPdfParser(
|
|
parser_config=ThreadedPdfParserConfig(
|
|
loglevel="fatal",
|
|
threads=2,
|
|
max_concurrent_results=1,
|
|
boundary_type=PdfPageBoundaryType.CROP_BOX,
|
|
),
|
|
decode_config=_make_decode_config(),
|
|
)
|
|
|
|
key = parser.load(filename)
|
|
count = sum(1 for result in parser.iterate_results() if result.success)
|
|
assert count == parser.page_count(key)
|
|
|
|
|
|
def test_threaded_single_thread():
|
|
"""Test threaded parsing with a single thread (sequential baseline)."""
|
|
filename = SAMPLE_PDF
|
|
|
|
parser = DoclingThreadedPdfParser(
|
|
parser_config=ThreadedPdfParserConfig(
|
|
loglevel="fatal",
|
|
threads=1,
|
|
max_concurrent_results=32,
|
|
boundary_type=PdfPageBoundaryType.CROP_BOX,
|
|
),
|
|
decode_config=_make_decode_config(),
|
|
)
|
|
|
|
key = parser.load(filename)
|
|
count = sum(1 for result in parser.iterate_results() if result.success)
|
|
assert count == parser.page_count(key)
|
|
|
|
|
|
def test_threaded_selected_pages_schedule_subset():
|
|
parser = DoclingThreadedPdfParser(
|
|
parser_config=ThreadedPdfParserConfig(
|
|
loglevel="fatal",
|
|
threads=2,
|
|
max_concurrent_results=4,
|
|
boundary_type=PdfPageBoundaryType.CROP_BOX,
|
|
),
|
|
decode_config=_make_decode_config(),
|
|
)
|
|
|
|
key = parser.load(LARGE_SAMPLE_PDF, page_numbers=[2, 1, 2])
|
|
|
|
assert parser.page_count(key) >= 2
|
|
assert parser.scheduled_page_count(key) == 2
|
|
|
|
emitted_pages = sorted(
|
|
result.page_number for result in parser.iterate_results() if result.success
|
|
)
|
|
assert emitted_pages == [1, 2]
|
|
|
|
|
|
def test_threaded_selected_pages_invalid_page_number():
|
|
parser = DoclingThreadedPdfParser(
|
|
parser_config=ThreadedPdfParserConfig(loglevel="fatal", threads=2),
|
|
decode_config=_make_decode_config(),
|
|
)
|
|
|
|
with pytest.raises(RuntimeError, match="Invalid page number"):
|
|
parser.load(SAMPLE_PDF, page_numbers=[9999])
|
|
|
|
|
|
def test_threaded_multiple_documents_with_different_subsets():
|
|
parser = DoclingThreadedPdfParser(
|
|
parser_config=ThreadedPdfParserConfig(
|
|
loglevel="fatal",
|
|
threads=4,
|
|
max_concurrent_results=8,
|
|
boundary_type=PdfPageBoundaryType.CROP_BOX,
|
|
),
|
|
decode_config=_make_decode_config(),
|
|
)
|
|
|
|
path_key = parser.load(LARGE_SAMPLE_PDF, page_numbers=[1, 2])
|
|
bytes_key = parser.load(SAMPLE_PDF, page_numbers=[1])
|
|
|
|
results_by_key: dict[str, list[int]] = {}
|
|
for result in parser.iterate_results():
|
|
assert result.success, result.error_message
|
|
results_by_key.setdefault(result.doc_key, []).append(result.page_number)
|
|
|
|
assert sorted(results_by_key[path_key]) == [1, 2]
|
|
assert sorted(results_by_key[bytes_key]) == [1]
|
|
assert parser.scheduled_page_count(path_key) == 2
|
|
assert parser.scheduled_page_count(bytes_key) == 1
|
|
|
|
|
|
def test_threaded_unload_after_consumption_is_idempotent():
|
|
parser = DoclingThreadedPdfParser(
|
|
parser_config=ThreadedPdfParserConfig(loglevel="fatal", threads=2),
|
|
decode_config=_make_decode_config(),
|
|
)
|
|
|
|
key = parser.load(SAMPLE_PDF, page_numbers=[1])
|
|
list(parser.iterate_results())
|
|
|
|
assert parser.unload(key) is True
|
|
assert parser.unload(key) is False
|
|
|
|
with pytest.raises(ValueError):
|
|
parser.page_count(key)
|
|
|
|
|
|
def test_threaded_unload_during_active_iteration_raises():
|
|
parser = DoclingThreadedPdfParser(
|
|
parser_config=ThreadedPdfParserConfig(loglevel="fatal", threads=2),
|
|
decode_config=_make_decode_config(),
|
|
)
|
|
|
|
key = parser.load(SAMPLE_PDF)
|
|
assert parser.has_tasks()
|
|
|
|
with pytest.raises(RuntimeError, match="threaded iteration is active"):
|
|
parser.unload(key)
|