mirror of
https://github.com/docling-project/docling-parse.git
synced 2026-05-17 13:10:49 +00:00
1f650dd412
Signed-off-by: Peter Staar <taa@zurich.ibm.com>
319 lines
10 KiB
Python
319 lines
10 KiB
Python
#!/usr/bin/env python
|
|
"""Tests for the threaded PDF parser."""
|
|
|
|
import glob
|
|
import os
|
|
from pathlib import Path
|
|
|
|
from docling_core.types.doc.page import PdfPageBoundaryType, SegmentedPdfPage
|
|
|
|
from docling_parse.pdf_parser import (
|
|
DecodePageConfig,
|
|
DoclingPdfParser,
|
|
DoclingThreadedPdfParser,
|
|
PdfDocument,
|
|
ThreadedPdfParserConfig,
|
|
)
|
|
from tests.test_parse import (
|
|
GROUNDTRUTH_FOLDER,
|
|
REGRESSION_FOLDER,
|
|
verify_SegmentedPdfPage,
|
|
)
|
|
|
|
|
|
def _build_segmented_page_from_decoder(
|
|
page_decoder, boundary_type=PdfPageBoundaryType.CROP_BOX
|
|
):
|
|
"""Build a SegmentedPdfPage from a page decoder, reusing PdfDocument's conversion logic."""
|
|
# Create a minimal PdfDocument just for its conversion methods
|
|
dummy_doc = PdfDocument.__new__(PdfDocument)
|
|
dummy_doc._boundary_type = boundary_type
|
|
config = DecodePageConfig()
|
|
config.page_boundary = boundary_type.value
|
|
config.do_sanitization = False
|
|
config.keep_qpdf_warnings = False
|
|
return dummy_doc._to_segmented_page_from_decoder(
|
|
page_decoder=page_decoder, config=config
|
|
)
|
|
|
|
|
|
def test_threaded_reference_documents_from_filenames():
|
|
"""Load all regression PDFs, decode all pages in parallel, and verify against groundtruth."""
|
|
|
|
pdf_docs = sorted(glob.glob(REGRESSION_FOLDER))
|
|
assert len(pdf_docs) > 0, "len(pdf_docs)==0 -> nothing to test"
|
|
|
|
decode_config = DecodePageConfig()
|
|
decode_config.page_boundary = "crop_box"
|
|
decode_config.do_sanitization = False
|
|
decode_config.keep_glyphs = True
|
|
decode_config.keep_qpdf_warnings = False
|
|
|
|
parser_config = ThreadedPdfParserConfig(
|
|
loglevel="fatal",
|
|
threads=4,
|
|
max_concurrent_results=32,
|
|
)
|
|
|
|
parser = DoclingThreadedPdfParser(
|
|
parser_config=parser_config,
|
|
decode_config=decode_config,
|
|
)
|
|
|
|
# Load all documents
|
|
for pdf_doc_path in pdf_docs:
|
|
parser.load(pdf_doc_path)
|
|
|
|
# Page restrictions (same as sequential test)
|
|
page_restrictions = {
|
|
"deep-mediabox-inheritance.pdf": [2],
|
|
"font_06.pdf": [1],
|
|
"font_07.pdf": [1],
|
|
"font_08.pdf": [1],
|
|
"font_09.pdf": [1],
|
|
"font_10.pdf": [1],
|
|
}
|
|
|
|
# Collect all results
|
|
results = {}
|
|
while parser.has_tasks():
|
|
task = parser.get_task()
|
|
|
|
assert task.doc_key != "", "doc_key should not be empty"
|
|
|
|
if task.success:
|
|
page_decoder, timings = task.get()
|
|
page_number = task.page_number # 0-indexed
|
|
doc_key = task.doc_key
|
|
|
|
pred_page = _build_segmented_page_from_decoder(page_decoder)
|
|
|
|
if doc_key not in results:
|
|
results[doc_key] = {}
|
|
results[doc_key][page_number] = pred_page
|
|
else:
|
|
error_msg = task.error()
|
|
# Some pages may fail, log but don't assert
|
|
print(
|
|
f"Warning: task failed for {task.doc_key} page {task.page_number}: {error_msg}"
|
|
)
|
|
|
|
# Verify results against groundtruth (same logic as test_reference_documents_from_filenames)
|
|
for pdf_doc_path in pdf_docs:
|
|
key = f"key={str(Path(pdf_doc_path))}"
|
|
|
|
assert key in results, f"No results found for {pdf_doc_path}"
|
|
|
|
for page_number, pred_page in sorted(results[key].items()):
|
|
page_no = page_number + 1 # convert to 1-indexed for groundtruth filenames
|
|
|
|
rname = os.path.basename(pdf_doc_path)
|
|
|
|
# Skip pages not in restrictions
|
|
if rname in page_restrictions and page_no not in page_restrictions[rname]:
|
|
continue
|
|
|
|
fname = os.path.join(
|
|
GROUNDTRUTH_FOLDER, rname + f".page_no_{page_no}.py.json"
|
|
)
|
|
|
|
if os.path.exists(fname):
|
|
true_page = SegmentedPdfPage.load_from_json(fname)
|
|
verify_SegmentedPdfPage(true_page, pred_page, filename=fname)
|
|
|
|
|
|
def test_threaded_single_document():
|
|
"""Test threaded parsing with a single document."""
|
|
filename = "tests/data/regression/table_of_contents_01.pdf"
|
|
|
|
decode_config = DecodePageConfig()
|
|
decode_config.page_boundary = "crop_box"
|
|
decode_config.do_sanitization = False
|
|
decode_config.keep_glyphs = True
|
|
decode_config.keep_qpdf_warnings = False
|
|
|
|
parser = DoclingThreadedPdfParser(
|
|
parser_config=ThreadedPdfParserConfig(
|
|
loglevel="fatal", threads=2, max_concurrent_results=4
|
|
),
|
|
decode_config=decode_config,
|
|
)
|
|
|
|
key = parser.load(filename)
|
|
|
|
count = 0
|
|
while parser.has_tasks():
|
|
task = parser.get_task()
|
|
assert task.success, f"Failed to decode page {task.page_number}: {task.error()}"
|
|
assert task.doc_key == key
|
|
|
|
page_decoder, timings = task.get()
|
|
assert isinstance(timings, dict)
|
|
assert len(timings) > 0
|
|
|
|
count += 1
|
|
|
|
# Should have processed all pages
|
|
assert count > 0, "Should have processed at least one page"
|
|
|
|
|
|
def test_threaded_results_match_sequential():
|
|
"""Verify threaded results match sequential results for the same documents."""
|
|
|
|
"""
|
|
filenames = [
|
|
"tests/data/regression/font_01.pdf",
|
|
"tests/data/regression/ligatures_01.pdf",
|
|
]
|
|
"""
|
|
filenames = glob.glob("tests/data/regression/*.pdf")
|
|
|
|
decode_config = DecodePageConfig()
|
|
decode_config.page_boundary = "crop_box"
|
|
decode_config.do_sanitization = False
|
|
decode_config.keep_glyphs = True
|
|
decode_config.keep_qpdf_warnings = False
|
|
|
|
# Sequential parsing
|
|
seq_parser = DoclingPdfParser(loglevel="fatal")
|
|
sequential_pages = {}
|
|
for filename in filenames:
|
|
pdf_doc = seq_parser.load(
|
|
path_or_stream=filename,
|
|
boundary_type=PdfPageBoundaryType.CROP_BOX,
|
|
lazy=True,
|
|
)
|
|
key = f"key={filename}"
|
|
sequential_pages[key] = {}
|
|
for page_no, page in pdf_doc.iterate_pages(config=decode_config):
|
|
sequential_pages[key][page_no] = page
|
|
# print(f"seq: {key}, {page_no}")
|
|
|
|
# Threaded parsing
|
|
threaded_parser = DoclingThreadedPdfParser(
|
|
parser_config=ThreadedPdfParserConfig(
|
|
loglevel="fatal", threads=2, max_concurrent_results=4
|
|
),
|
|
decode_config=decode_config,
|
|
)
|
|
for filename in filenames:
|
|
threaded_parser.load(filename)
|
|
|
|
threaded_pages = {}
|
|
while threaded_parser.has_tasks():
|
|
task = threaded_parser.get_task()
|
|
assert task.success, f"Failed: {task.error()}"
|
|
|
|
page_decoder, timings = task.get()
|
|
pred_page = _build_segmented_page_from_decoder(page_decoder)
|
|
|
|
if task.doc_key not in threaded_pages:
|
|
threaded_pages[task.doc_key] = {}
|
|
threaded_pages[task.doc_key][task.page_number + 1] = pred_page # 1-indexed
|
|
# print(f"threaded: {task.doc_key}, {task.page_number + 1}")
|
|
|
|
# Compare
|
|
for key in sequential_pages:
|
|
assert key in threaded_pages, f"Missing key {key} in threaded results"
|
|
for page_no in sequential_pages[key]:
|
|
assert page_no in threaded_pages[key], f"Missing page {page_no} for {key}"
|
|
|
|
seq_page = sequential_pages[key][page_no]
|
|
thr_page = threaded_pages[key][page_no]
|
|
|
|
eps = max(
|
|
seq_page.dimension.width / 100.0, seq_page.dimension.height / 100.0
|
|
)
|
|
|
|
"""
|
|
print(f"** Page {page_no} for {key} **")
|
|
print(f" -> char-cells count for {key} page {page_no}: {len(seq_page.char_cells)} versus {len(thr_page.char_cells)}")
|
|
print(f" -> word-cells count for {key} page {page_no}: {len(seq_page.word_cells)} versus {len(thr_page.word_cells)}")
|
|
print(f" -> line-cells count for {key} page {page_no}: {len(seq_page.textline_cells)} versus {len(thr_page.textline_cells)}")
|
|
print(f" -> shapes count for {key} page {page_no}: {len(seq_page.shapes)} versus {len(thr_page.shapes)}")
|
|
"""
|
|
|
|
# Verify key fields match
|
|
assert len(seq_page.char_cells) == len(
|
|
thr_page.char_cells
|
|
), f"char_cells count mismatch for {key} page {page_no}"
|
|
|
|
"""
|
|
if len(seq_page.word_cells)!=len(thr_page.word_cells):
|
|
for i, cell in enumerate(seq_page.word_cells):
|
|
print(f" === [{i}] === ")
|
|
print(cell.text)
|
|
print(thr_page.word_cells[i].text)
|
|
assert cell.text==thr_page.word_cells[i].text
|
|
"""
|
|
|
|
assert len(seq_page.word_cells) == len(
|
|
thr_page.word_cells
|
|
), f"word_cells count mismatch for {key} page {page_no}"
|
|
assert len(seq_page.textline_cells) == len(
|
|
thr_page.textline_cells
|
|
), f"textline_cells count mismatch for {key} page {page_no}"
|
|
assert len(seq_page.shapes) == len(
|
|
thr_page.shapes
|
|
), f"shapes count mismatch for {key} page {page_no}"
|
|
|
|
|
|
def test_threaded_backpressure():
|
|
"""Test that backpressure works with max_concurrent_results=1."""
|
|
filename = "tests/data/regression/table_of_contents_01.pdf"
|
|
|
|
decode_config = DecodePageConfig()
|
|
decode_config.page_boundary = "crop_box"
|
|
decode_config.do_sanitization = False
|
|
decode_config.keep_glyphs = True
|
|
decode_config.keep_qpdf_warnings = False
|
|
|
|
parser = DoclingThreadedPdfParser(
|
|
parser_config=ThreadedPdfParserConfig(
|
|
loglevel="fatal",
|
|
threads=2,
|
|
max_concurrent_results=1, # Very tight backpressure
|
|
),
|
|
decode_config=decode_config,
|
|
)
|
|
|
|
parser.load(filename)
|
|
|
|
count = 0
|
|
while parser.has_tasks():
|
|
task = parser.get_task()
|
|
assert task.success, f"Failed: {task.error()}"
|
|
count += 1
|
|
|
|
assert count > 0
|
|
|
|
|
|
def test_threaded_single_thread():
|
|
"""Test threaded parsing with a single thread (sequential baseline)."""
|
|
filename = "tests/data/regression/font_01.pdf"
|
|
|
|
decode_config = DecodePageConfig()
|
|
decode_config.page_boundary = "crop_box"
|
|
decode_config.do_sanitization = False
|
|
decode_config.keep_glyphs = True
|
|
decode_config.keep_qpdf_warnings = False
|
|
|
|
parser = DoclingThreadedPdfParser(
|
|
parser_config=ThreadedPdfParserConfig(
|
|
loglevel="fatal",
|
|
threads=1,
|
|
max_concurrent_results=32,
|
|
),
|
|
decode_config=decode_config,
|
|
)
|
|
|
|
parser.load(filename)
|
|
|
|
count = 0
|
|
while parser.has_tasks():
|
|
task = parser.get_task()
|
|
assert task.success, f"Failed: {task.error()}"
|
|
count += 1
|
|
|
|
assert count > 0
|