Files
docling-parse/docling_parse/processing_dir.py
Michele Dolfi c9c452b6ee fix: setup hashlib for fips compliance (#123)
Signed-off-by: Michele Dolfi <dol@zurich.ibm.com>
2025-06-04 11:10:00 +02:00

172 lines
4.9 KiB
Python

import argparse
import glob
import hashlib
import logging
import os
from dataclasses import dataclass
from pathlib import Path
from queue import Queue
from tabulate import tabulate
from docling_parse import pdf_parser_v2 # type: ignore[attr-defined]
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
@dataclass
class FileTask:
folder_name: str
file_name: str # Local path where the file will be processed or saved
file_hash: str
def parse_arguments():
"""Parse arguments for directory parsing."""
parser = argparse.ArgumentParser(
description="Process S3 files using multithreading."
)
parser.add_argument(
"-d", "--directory", help="input directory with pdf files", required=True
)
parser.add_argument(
"-r",
"--recursive",
help="recursively finding pdf-files",
required=False,
default=False,
)
parser.add_argument(
"-p",
"--page-level-parsing",
help="parse pdf-files page-by-page",
required=False,
default=True,
)
# Restrict log-level to specific values
parser.add_argument(
"-l",
"--log-level",
type=str,
choices=["info", "warning", "error", "fatal"],
required=False,
default="fatal",
help="Log level [info, warning, error, fatal]",
)
args = parser.parse_args()
return args.directory, args.recursive, args.log_level, args.page_level_parsing
def fetch_files_from_disk(directory, recursive, task_queue):
"""Recursively fetch files from disk and add them to the queue."""
logging.info(f"Fetching file keys from disk: {directory}")
for filename in sorted(glob.glob(os.path.join(directory, "*.pdf"))):
file_name = str(Path(filename).resolve())
hash_object = hashlib.sha256(filename.encode(), usedforsecurity=False)
file_hash = hash_object.hexdigest()
# Create a FileTask object
task = FileTask(folder_name=directory, file_name=file_name, file_hash=file_hash)
task_queue.put(task)
task_queue.put(None)
logging.info("Done with queue")
def process_files_from_queue(file_queue: Queue, page_level: bool, loglevel: str):
"""Process files from the queue."""
overview = []
while not file_queue.empty():
task = file_queue.get()
if task is None: # End of queue signal
break
logging.info(
f"Queue-size [{file_queue.qsize()}], Processing task: {task.file_name}"
)
try:
parser = pdf_parser_v2(loglevel)
parser.load_document(task.file_hash, str(task.file_name))
num_pages = parser.number_of_pages(task.file_hash)
logging.info(f" => #-pages of {task.file_name}: {num_pages}")
overview.append([str(task.file_name), num_pages, -1, True])
if page_level:
# Parse page by page to minimize memory footprint
for page in range(0, num_pages):
fname = f"{task.file_name}-page-{page:03}.json"
try:
json_doc = parser.parse_pdf_from_key_on_page(
task.file_hash, page
)
"""
with open(os.path.join(directory, fname), "w") as fw:
fw.write(json.dumps(json_doc, indent=2))
"""
overview.append([fname, num_pages, page, True])
except Exception as exc:
overview.append([fname, num_pages, page, False])
logging.error(
f"problem with parsing {task.file_name} on page {page}: {exc}"
)
else:
parser.parse_pdf_from_key(task.file_hash)
"""
# with open(os.path.join(task.folder_name, f"{task.file_name}.json"), "w") as fw:
with open(f"{task.file_name}.json", "w") as fw:
fw.write(json.dumps(json_doc, indent=2))
"""
overview.append([str(task.file_name), num_pages, -1, True])
# Unload the (QPDF) document and buffers
parser.unload_document(task.file_hash)
except Exception as exc:
logging.error(exc)
overview.append([str(task.file_name), -1, -1, False])
return overview
def main():
directory, recursive, loglevel, page_level_parsing = parse_arguments()
task_queue = Queue()
fetch_files_from_disk(directory, recursive, task_queue)
overview = process_files_from_queue(task_queue, page_level_parsing, loglevel)
print(tabulate(overview, headers=["filename", "success", "page-number", "#-pages"]))
logging.info("All files processed successfully.")
if __name__ == "__main__":
main()