mirror of
https://github.com/docling-project/docling-parse.git
synced 2026-05-17 13:10:49 +00:00
c9c452b6ee
Signed-off-by: Michele Dolfi <dol@zurich.ibm.com>
172 lines
4.9 KiB
Python
172 lines
4.9 KiB
Python
import argparse
|
|
import glob
|
|
import hashlib
|
|
import logging
|
|
import os
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from queue import Queue
|
|
|
|
from tabulate import tabulate
|
|
|
|
from docling_parse import pdf_parser_v2 # type: ignore[attr-defined]
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class FileTask:
|
|
folder_name: str
|
|
|
|
file_name: str # Local path where the file will be processed or saved
|
|
file_hash: str
|
|
|
|
|
|
def parse_arguments():
|
|
"""Parse arguments for directory parsing."""
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="Process S3 files using multithreading."
|
|
)
|
|
parser.add_argument(
|
|
"-d", "--directory", help="input directory with pdf files", required=True
|
|
)
|
|
parser.add_argument(
|
|
"-r",
|
|
"--recursive",
|
|
help="recursively finding pdf-files",
|
|
required=False,
|
|
default=False,
|
|
)
|
|
parser.add_argument(
|
|
"-p",
|
|
"--page-level-parsing",
|
|
help="parse pdf-files page-by-page",
|
|
required=False,
|
|
default=True,
|
|
)
|
|
|
|
# Restrict log-level to specific values
|
|
parser.add_argument(
|
|
"-l",
|
|
"--log-level",
|
|
type=str,
|
|
choices=["info", "warning", "error", "fatal"],
|
|
required=False,
|
|
default="fatal",
|
|
help="Log level [info, warning, error, fatal]",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
return args.directory, args.recursive, args.log_level, args.page_level_parsing
|
|
|
|
|
|
def fetch_files_from_disk(directory, recursive, task_queue):
|
|
"""Recursively fetch files from disk and add them to the queue."""
|
|
logging.info(f"Fetching file keys from disk: {directory}")
|
|
|
|
for filename in sorted(glob.glob(os.path.join(directory, "*.pdf"))):
|
|
|
|
file_name = str(Path(filename).resolve())
|
|
|
|
hash_object = hashlib.sha256(filename.encode(), usedforsecurity=False)
|
|
file_hash = hash_object.hexdigest()
|
|
|
|
# Create a FileTask object
|
|
task = FileTask(folder_name=directory, file_name=file_name, file_hash=file_hash)
|
|
task_queue.put(task)
|
|
|
|
task_queue.put(None)
|
|
logging.info("Done with queue")
|
|
|
|
|
|
def process_files_from_queue(file_queue: Queue, page_level: bool, loglevel: str):
|
|
"""Process files from the queue."""
|
|
|
|
overview = []
|
|
|
|
while not file_queue.empty():
|
|
|
|
task = file_queue.get()
|
|
if task is None: # End of queue signal
|
|
break
|
|
|
|
logging.info(
|
|
f"Queue-size [{file_queue.qsize()}], Processing task: {task.file_name}"
|
|
)
|
|
|
|
try:
|
|
parser = pdf_parser_v2(loglevel)
|
|
|
|
parser.load_document(task.file_hash, str(task.file_name))
|
|
|
|
num_pages = parser.number_of_pages(task.file_hash)
|
|
logging.info(f" => #-pages of {task.file_name}: {num_pages}")
|
|
|
|
overview.append([str(task.file_name), num_pages, -1, True])
|
|
|
|
if page_level:
|
|
# Parse page by page to minimize memory footprint
|
|
for page in range(0, num_pages):
|
|
fname = f"{task.file_name}-page-{page:03}.json"
|
|
|
|
try:
|
|
json_doc = parser.parse_pdf_from_key_on_page(
|
|
task.file_hash, page
|
|
)
|
|
|
|
"""
|
|
with open(os.path.join(directory, fname), "w") as fw:
|
|
fw.write(json.dumps(json_doc, indent=2))
|
|
"""
|
|
|
|
overview.append([fname, num_pages, page, True])
|
|
except Exception as exc:
|
|
overview.append([fname, num_pages, page, False])
|
|
logging.error(
|
|
f"problem with parsing {task.file_name} on page {page}: {exc}"
|
|
)
|
|
else:
|
|
|
|
parser.parse_pdf_from_key(task.file_hash)
|
|
|
|
"""
|
|
# with open(os.path.join(task.folder_name, f"{task.file_name}.json"), "w") as fw:
|
|
with open(f"{task.file_name}.json", "w") as fw:
|
|
fw.write(json.dumps(json_doc, indent=2))
|
|
"""
|
|
|
|
overview.append([str(task.file_name), num_pages, -1, True])
|
|
|
|
# Unload the (QPDF) document and buffers
|
|
parser.unload_document(task.file_hash)
|
|
|
|
except Exception as exc:
|
|
logging.error(exc)
|
|
overview.append([str(task.file_name), -1, -1, False])
|
|
|
|
return overview
|
|
|
|
|
|
def main():
|
|
|
|
directory, recursive, loglevel, page_level_parsing = parse_arguments()
|
|
|
|
task_queue = Queue()
|
|
|
|
fetch_files_from_disk(directory, recursive, task_queue)
|
|
|
|
overview = process_files_from_queue(task_queue, page_level_parsing, loglevel)
|
|
|
|
print(tabulate(overview, headers=["filename", "success", "page-number", "#-pages"]))
|
|
|
|
logging.info("All files processed successfully.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|