Improve threaded docling-parse backend integration

- add dedicated threaded docling-parse backend options and wire CLI
  num_threads into parser_threads
- make the threaded backend honor parser_threads, falling back to
  AcceleratorOptions only when unset
- resolve threaded page ranges explicitly and clip open-ended requests
  against the actual document length
- cache page sizes in StandardPdfPipeline so failed-page recovery does
  not call load_page() on iterator-only threaded backends
- reject threaded docling-parse in VLM pipelines that still require
  ordered/random load_page() access
- extend backend, CLI, and compatibility tests for the new threaded
  backend behavior
- update the editable docling-parse lock entry

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>
This commit is contained in:
Christoph Auer
2026-05-11 14:12:46 +02:00
parent d3ac439d1d
commit 82128f4df6
12 changed files with 242 additions and 49 deletions
-1
View File
@@ -1,5 +1,4 @@
fail_fast: true
minimum_pre_commit_version: "3.7.0"
default_stages: [pre-commit]
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
+42 -7
View File
@@ -1,5 +1,4 @@
import logging
import sys # sys.maxsize used for page-range default detection
from collections.abc import Iterable, Iterator
from io import BytesIO
from pathlib import Path
@@ -26,8 +25,12 @@ from docling.backend.managed_pdfium_backend import (
)
from docling.backend.pdf_backend import PdfDocumentBackend, PdfPageBackend
from docling.datamodel.accelerator_options import AcceleratorOptions
from docling.datamodel.backend_options import PdfBackendOptions
from docling.datamodel.backend_options import (
PdfBackendOptions,
ThreadedDoclingParseBackendOptions,
)
from docling.datamodel.base_models import Size
from docling.datamodel.settings import DEFAULT_PAGE_RANGE
from docling.utils.locks import pypdfium2_lock
if TYPE_CHECKING:
@@ -316,6 +319,38 @@ def _make_threaded_render_config() -> RenderConfig:
return config
def _resolve_threaded_page_numbers(
path_or_stream: Union[BytesIO, Path],
password: Optional[str],
page_range: tuple[int, int],
) -> list[int] | None:
start_page, end_page = page_range
if page_range == DEFAULT_PAGE_RANGE:
return None
with pypdfium2_lock:
pdoc = pdfium.PdfDocument(path_or_stream, password=password)
try:
page_count = len(pdoc)
finally:
pdoc.close()
clipped_end_page = min(end_page, page_count)
if start_page > clipped_end_page:
return []
return list(range(start_page, clipped_end_page + 1))
def _resolve_threaded_parser_threads(options: PdfBackendOptions) -> int:
if isinstance(options, ThreadedDoclingParseBackendOptions):
if options.parser_threads is not None:
return options.parser_threads
return AcceleratorOptions().num_threads
class ThreadedDoclingParsePageBackend(PdfPageBackend):
def __init__(self, result: PageParseResult):
self._result = result
@@ -407,16 +442,16 @@ class ThreadedDoclingParseDocumentBackend(PdfDocumentBackend):
password = (
self.options.password.get_secret_value() if self.options.password else None
)
start_page, end_page = in_doc.limits.page_range
requested_page_numbers = (
None if end_page == sys.maxsize else list(range(start_page, end_page + 1))
requested_page_numbers = _resolve_threaded_page_numbers(
self.path_or_stream,
password,
in_doc.limits.page_range,
)
parser_threads = AcceleratorOptions().num_threads
self.parser = DoclingThreadedPdfParser(
parser_config=ThreadedPdfParserConfig(
loglevel="fatal",
threads=parser_threads,
threads=_resolve_threaded_parser_threads(self.options),
render_config=_make_threaded_render_config(),
),
decode_config=_make_threaded_decode_config(),
+8 -17
View File
@@ -69,7 +69,10 @@ from docling.datamodel.asr_model_specs import (
WHISPER_TURBO_NATIVE,
AsrModelType,
)
from docling.datamodel.backend_options import PdfBackendOptions
from docling.datamodel.backend_options import (
PdfBackendOptions,
ThreadedDoclingParseBackendOptions,
)
from docling.datamodel.base_models import (
ConversionStatus,
FormatToExtensions,
@@ -755,12 +758,8 @@ def convert( # noqa: C901
ocr_options, TesseractOcrOptions | TesseractCliOcrOptions
):
ocr_options.psm = psm
accelerator_options = AcceleratorOptions(num_threads=num_threads, device=device)
# pipeline_options: PaginatedPipelineOptions
pipeline_options: PipelineOptions
format_options: dict[InputFormat, FormatOption] = {}
pdf_backend_options: PdfBackendOptions | None = PdfBackendOptions(
password=pdf_password
@@ -784,9 +783,7 @@ def convert( # noqa: C901
if isinstance(
pipeline_options.table_structure_options, TableStructureOptions
):
pipeline_options.table_structure_options.do_cell_matching = (
True # do_cell_matching
)
pipeline_options.table_structure_options.do_cell_matching = True
pipeline_options.table_structure_options.mode = table_mode
if _should_generate_export_images(
@@ -798,15 +795,15 @@ def convert( # noqa: C901
True # FIXME: to be deprecated in version 3
)
pipeline_options.images_scale = 2
# Normalize deprecated backend values
pdf_backend = normalize_pdf_backend(pdf_backend)
backend: Type[PdfDocumentBackend]
if pdf_backend == PdfBackend.DOCLING_PARSE:
backend = DoclingParseDocumentBackend # type: ignore
elif pdf_backend == PdfBackend.THREADED_DOCLING_PARSE:
backend = ThreadedDoclingParseDocumentBackend # type: ignore
pdf_backend_options = ThreadedDoclingParseBackendOptions(
password=pdf_password, parser_threads=num_threads
)
elif pdf_backend == PdfBackend.PYPDFIUM2:
backend = PyPdfiumDocumentBackend # type: ignore
else:
@@ -817,16 +814,12 @@ def convert( # noqa: C901
backend=backend, # pdf_backend
backend_options=pdf_backend_options,
)
# METS GBS options
mets_gbs_options = pipeline_options.model_copy()
mets_gbs_options.do_ocr = False
mets_gbs_format_option = PdfFormatOption(
pipeline_options=mets_gbs_options,
backend=MetsGbsDocumentBackend,
)
# SimplePipeline options
simple_format_option = ConvertPipelineOptions(
do_picture_description=enrich_picture_description,
do_picture_classification=enrich_picture_classes,
@@ -834,8 +827,6 @@ def convert( # noqa: C901
)
if artifacts_path is not None:
simple_format_option.artifacts_path = artifacts_path
# Use image-native backend for IMAGE to avoid pypdfium2 locking
image_format_option = PdfFormatOption(
pipeline_options=pipeline_options,
backend=ImageDocumentBackend,
+16
View File
@@ -131,6 +131,21 @@ class PdfBackendOptions(BaseBackendOptions):
password: Optional[SecretStr] = None
class ThreadedDoclingParseBackendOptions(PdfBackendOptions):
"""Options specific to the threaded docling-parse backend."""
kind: Literal["threaded-docling-parse"] = Field(
"threaded-docling-parse", exclude=True, repr=False
)
parser_threads: Optional[PositiveInt] = Field(
None,
description=(
"Number of parser threads to use for the threaded docling-parse backend. "
"If unset, the backend falls back to global accelerator thread settings."
),
)
class MetsGbsBackendOptions(PdfBackendOptions):
"""Options specific to the METS-GBS document backend."""
@@ -219,6 +234,7 @@ BackendOptions = Annotated[
HTMLBackendOptions,
MarkdownBackendOptions,
PdfBackendOptions,
ThreadedDoclingParseBackendOptions,
MetsGbsBackendOptions,
MsExcelBackendOptions,
LatexBackendOptions,
@@ -20,6 +20,7 @@ if TYPE_CHECKING:
from docling_core.types.doc.page import SegmentedPage
from docling.backend.abstract_backend import AbstractDocumentBackend
from docling.backend.docling_parse_backend import ThreadedDoclingParseDocumentBackend
from docling.backend.pdf_backend import PdfDocumentBackend
from docling.datamodel.base_models import ConversionStatus, Page
from docling.datamodel.document import ConversionResult
@@ -52,6 +53,17 @@ from docling.utils.profiling import ProfilingScope, TimeRecorder
_log = logging.getLogger(__name__)
def _raise_if_unsupported_threaded_backend(
backend: AbstractDocumentBackend, pipeline_name: str
) -> None:
if isinstance(backend, ThreadedDoclingParseDocumentBackend):
raise RuntimeError(
f"{pipeline_name} does not support ThreadedDoclingParseDocumentBackend yet. "
"It still requires ordered/random page access via load_page() and cannot "
"consume iterator-only or out-of-order page delivery. Use StandardPdfPipeline instead."
)
class ThreadedLayoutVlmPipeline(BasePipeline):
"""Two-stage threaded pipeline: Layout Model → VLM Model."""
@@ -225,9 +237,10 @@ class ThreadedLayoutVlmPipeline(BasePipeline):
def _build_document(self, conv_res: ConversionResult) -> ConversionResult:
"""Build document using threaded layout+VLM pipeline."""
run_id = next(self._run_seq)
assert isinstance(conv_res.input._backend, PdfDocumentBackend)
backend = conv_res.input._backend
_raise_if_unsupported_threaded_backend(backend, self.__class__.__name__)
run_id = next(self._run_seq)
# Initialize pages
start_page, end_page = conv_res.input.limits.page_range
@@ -7,6 +7,7 @@ from PIL.Image import Image
from pydantic import BaseModel
from docling.backend.abstract_backend import PaginatedDocumentBackend
from docling.backend.docling_parse_backend import ThreadedDoclingParseDocumentBackend
from docling.backend.pdf_backend import PdfDocumentBackend
from docling.datamodel.base_models import ConversionStatus, ErrorItem, VlmStopReason
from docling.datamodel.document import InputDocument
@@ -29,6 +30,17 @@ from docling.utils.accelerator_utils import decide_device
_log = logging.getLogger(__name__)
def _raise_if_unsupported_threaded_backend(
backend: PaginatedDocumentBackend, pipeline_name: str
) -> None:
if isinstance(backend, ThreadedDoclingParseDocumentBackend):
raise RuntimeError(
f"{pipeline_name} does not support ThreadedDoclingParseDocumentBackend yet. "
"It still requires ordered/random page access via load_page() and cannot "
"consume iterator-only or out-of-order page delivery. Use StandardPdfPipeline instead."
)
class ExtractionVlmPipeline(BaseExtractionPipeline):
def __init__(self, pipeline_options: VlmExtractionPipelineOptions):
super().__init__(pipeline_options)
@@ -51,6 +63,10 @@ class ExtractionVlmPipeline(BaseExtractionPipeline):
template: Optional[ExtractionTemplateType] = None,
) -> ExtractionResult:
"""Extract data using the VLM model."""
backend = ext_res.input._backend
if isinstance(backend, PdfDocumentBackend):
_raise_if_unsupported_threaded_backend(backend, self.__class__.__name__)
try:
# Get images from input document using the backend
images = self._get_images_from_input(ext_res.input)
+5 -19
View File
@@ -474,6 +474,7 @@ class StandardPdfPipeline(ConvertPipeline):
super().__init__(pipeline_options)
self.pipeline_options: ThreadedPdfPipelineOptions = pipeline_options
self._run_seq = itertools.count(1) # deterministic, monotonic run ids
self._page_sizes_by_no: dict[int, Size] = {}
# initialise heavy models once
self._init_models()
@@ -645,6 +646,7 @@ class StandardPdfPipeline(ConvertPipeline):
The thread continues running until the blocking call completes, potentially holding
resources (e.g., pypdfium2_lock).
"""
self._page_sizes_by_no = {}
run_id = next(self._run_seq)
assert isinstance(conv_res.input._backend, PdfDocumentBackend)
backend = conv_res.input._backend
@@ -692,6 +694,7 @@ class StandardPdfPipeline(ConvertPipeline):
page._backend = page_backend
try:
page.size = page_backend.get_size()
self._page_sizes_by_no[page.page_no] = page.size
except Exception:
if page_backend.is_valid():
raise
@@ -957,26 +960,8 @@ class StandardPdfPipeline(ConvertPipeline):
if not missing_page_nos:
return
# Try to get size information from the backend for missing pages
backend = conv_res.input._backend
for page_no in sorted(missing_page_nos):
try:
# Attempt to get page size from backend
if isinstance(backend, PdfDocumentBackend):
page_backend = backend.load_page(page_no - 1)
try:
if page_backend.is_valid():
size = page_backend.get_size()
else:
# Use a default size if page backend is invalid
size = Size(width=0.0, height=0.0)
finally:
page_backend.unload()
else:
size = Size(width=0.0, height=0.0)
except Exception:
# If we can't get size, use default
size = Size(width=0.0, height=0.0)
size = self._page_sizes_by_no.get(page_no, Size(width=0.0, height=0.0))
# Add the failed page to the document's pages dict
conv_res.document.pages[page_no] = PageItem(
@@ -1004,6 +989,7 @@ class StandardPdfPipeline(ConvertPipeline):
return conv_res.status
def _unload(self, conv_res: ConversionResult) -> None:
self._page_sizes_by_no = {}
for p in conv_res.pages:
if p._backend is not None:
p._backend.unload()
+15
View File
@@ -26,6 +26,7 @@ from docling.backend.abstract_backend import (
AbstractDocumentBackend,
DeclarativeDocumentBackend,
)
from docling.backend.docling_parse_backend import ThreadedDoclingParseDocumentBackend
from docling.backend.html_backend import HTMLDocumentBackend
from docling.backend.md_backend import MarkdownDocumentBackend
from docling.backend.pdf_backend import PdfDocumentBackend
@@ -66,6 +67,17 @@ _log = logging.getLogger(__name__)
_DOCLANG_OPEN_RE = re.compile(r"<doclang(?:\s[^>]*)?>")
def _raise_if_unsupported_threaded_backend(
backend: AbstractDocumentBackend, pipeline_name: str
) -> None:
if isinstance(backend, ThreadedDoclingParseDocumentBackend):
raise RuntimeError(
f"{pipeline_name} does not support ThreadedDoclingParseDocumentBackend yet. "
"It still requires ordered/random page access via load_page() and cannot "
"consume iterator-only or out-of-order page delivery. Use StandardPdfPipeline instead."
)
class VlmPipeline(PaginatedPipeline):
def __init__(self, pipeline_options: VlmPipelineOptions):
super().__init__(pipeline_options)
@@ -195,6 +207,9 @@ class VlmPipeline(PaginatedPipeline):
images_scale = self.pipeline_options.images_scale
if images_scale is not None:
page._default_image_scale = images_scale
_raise_if_unsupported_threaded_backend(
conv_res.input._backend, self.__class__.__name__
)
page._backend = conv_res.input._backend.load_page(page.page_no - 1) # type: ignore
if page._backend is not None and page._backend.is_valid():
page.size = page._backend.get_size()
+111 -2
View File
@@ -1,3 +1,4 @@
import sys
from pathlib import Path
from typing import Any
@@ -11,6 +12,7 @@ from docling.backend.docling_parse_backend import (
ThreadedDoclingParseDocumentBackend,
ThreadedDoclingParsePageBackend,
)
from docling.datamodel.backend_options import ThreadedDoclingParseBackendOptions
from docling.datamodel.base_models import BoundingBox, InputFormat
from docling.datamodel.document import InputDocument
from docling.datamodel.settings import DocumentLimits
@@ -183,6 +185,18 @@ class _FakeThreadedParser:
return True
class _FakePdfiumDocument:
def __init__(self, path_or_stream, password=None) -> None:
self.path_or_stream = path_or_stream
self.password = password
def __len__(self) -> int:
return 5
def close(self) -> None:
return None
def test_threaded_backend_iterates_requested_pages_and_unloads(
test_doc_path, monkeypatch: pytest.MonkeyPatch
):
@@ -190,6 +204,10 @@ def test_threaded_backend_iterates_requested_pages_and_unloads(
"docling.backend.docling_parse_backend.DoclingThreadedPdfParser",
_FakeThreadedParser,
)
monkeypatch.setattr(
"docling.backend.docling_parse_backend.pdfium.PdfDocument",
_FakePdfiumDocument,
)
in_doc = InputDocument(
path_or_stream=test_doc_path,
@@ -213,13 +231,73 @@ def test_threaded_backend_iterates_requested_pages_and_unloads(
assert parser.unload_calls == ["doc-key"]
def test_threaded_backend_no_page_range_passes_none(
def test_threaded_backend_open_ended_page_range_is_clipped_to_document(
test_doc_path, monkeypatch: pytest.MonkeyPatch
):
monkeypatch.setattr(
"docling.backend.docling_parse_backend.DoclingThreadedPdfParser",
_FakeThreadedParser,
)
monkeypatch.setattr(
"docling.backend.docling_parse_backend.pdfium.PdfDocument",
_FakePdfiumDocument,
)
in_doc = InputDocument(
path_or_stream=test_doc_path,
format=InputFormat.PDF,
backend=ThreadedDoclingParseDocumentBackend,
limits=DocumentLimits(page_range=(2, sys.maxsize)),
)
parser = _FakeThreadedParser.created
assert parser is not None
assert parser.load_calls == [[2, 3, 4, 5]]
in_doc._backend.unload()
def test_threaded_backend_bounded_page_range_is_clipped_to_document(
test_doc_path, monkeypatch: pytest.MonkeyPatch
):
monkeypatch.setattr(
"docling.backend.docling_parse_backend.DoclingThreadedPdfParser",
_FakeThreadedParser,
)
monkeypatch.setattr(
"docling.backend.docling_parse_backend.pdfium.PdfDocument",
_FakePdfiumDocument,
)
in_doc = InputDocument(
path_or_stream=test_doc_path,
format=InputFormat.PDF,
backend=ThreadedDoclingParseDocumentBackend,
limits=DocumentLimits(page_range=(2, 99)),
)
parser = _FakeThreadedParser.created
assert parser is not None
assert parser.load_calls == [list(range(2, 100))]
in_doc._backend.unload()
def test_threaded_backend_no_page_range_passes_none_without_page_count_probe(
test_doc_path, monkeypatch: pytest.MonkeyPatch
):
class _FailingPdfiumDocument:
def __init__(self, path_or_stream, password=None) -> None:
raise AssertionError("page count should not be probed for default ranges")
monkeypatch.setattr(
"docling.backend.docling_parse_backend.DoclingThreadedPdfParser",
_FakeThreadedParser,
)
monkeypatch.setattr(
"docling.backend.docling_parse_backend.pdfium.PdfDocument",
_FailingPdfiumDocument,
)
in_doc = InputDocument(
path_or_stream=test_doc_path,
@@ -235,7 +313,38 @@ def test_threaded_backend_no_page_range_passes_none(
in_doc._backend.unload()
def test_threaded_backend_uses_accelerator_thread_count(
def test_threaded_backend_uses_backend_option_thread_count(
test_doc_path, monkeypatch: pytest.MonkeyPatch
):
class _FakeAcceleratorOptions:
def __init__(self) -> None:
self.num_threads = 7
monkeypatch.setattr(
"docling.backend.docling_parse_backend.DoclingThreadedPdfParser",
_FakeThreadedParser,
)
monkeypatch.setattr(
"docling.backend.docling_parse_backend.AcceleratorOptions",
_FakeAcceleratorOptions,
)
in_doc = InputDocument(
path_or_stream=test_doc_path,
format=InputFormat.PDF,
backend=ThreadedDoclingParseDocumentBackend,
backend_options=ThreadedDoclingParseBackendOptions(parser_threads=11),
)
parser = _FakeThreadedParser.created
assert parser is not None
assert parser.parser_config is not None
assert parser.parser_config.threads == 11
in_doc._backend.unload()
def test_threaded_backend_uses_accelerator_thread_count_when_unset(
test_doc_path, monkeypatch: pytest.MonkeyPatch
):
class _FakeAcceleratorOptions:
+11
View File
@@ -6,6 +6,7 @@ from docling_core.types.doc import ImageRefMode
from typer.testing import CliRunner
from docling.cli.main import _should_generate_export_images, app
from docling.datamodel.backend_options import ThreadedDoclingParseBackendOptions
from docling.datamodel.base_models import InputFormat, OutputFormat
from docling.datamodel.pipeline_options import PdfBackend
from docling.document_converter import PdfFormatOption
@@ -131,6 +132,7 @@ def test_cli_accepts_threaded_docling_parse_backend(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
captured_backend: type[Any] | None = None
captured_backend_options: ThreadedDoclingParseBackendOptions | None = None
class _FakeDocumentConverter:
def __init__(
@@ -140,9 +142,14 @@ def test_cli_accepts_threaded_docling_parse_backend(
format_options: dict[InputFormat, PdfFormatOption],
) -> None:
nonlocal captured_backend
nonlocal captured_backend_options
pdf_option = format_options[InputFormat.PDF]
assert isinstance(pdf_option, PdfFormatOption)
captured_backend = pdf_option.backend
assert isinstance(
pdf_option.backend_options, ThreadedDoclingParseBackendOptions
)
captured_backend_options = pdf_option.backend_options
def convert_all(
self,
@@ -166,9 +173,13 @@ def test_cli_accepts_threaded_docling_parse_backend(
str(output),
"--pdf-backend",
PdfBackend.THREADED_DOCLING_PARSE.value,
"--num-threads",
"7",
],
)
assert result.exit_code == 0
assert captured_backend is not None
assert captured_backend.__name__ == "ThreadedDoclingParseDocumentBackend"
assert captured_backend_options is not None
assert captured_backend_options.parser_threads == 7
+2
View File
@@ -1,6 +1,8 @@
import time
from pathlib import Path
import pytest
from docling.backend.docling_parse_backend import (
DoclingParseDocumentBackend,
ThreadedDoclingParseDocumentBackend,
Generated
+2 -2
View File
@@ -1,5 +1,5 @@
version = 1
revision = 2
revision = 3
requires-python = ">=3.10, <4.0"
resolution-markers = [
"python_full_version >= '3.15' and sys_platform == 'win32'",
@@ -1222,7 +1222,7 @@ wheels = [
[[package]]
name = "docling-parse"
version = "5.10.1"
version = "5.11.0"
source = { editable = "../docling-parse" }
dependencies = [
{ name = "docling-core" },