Files

790 lines
38 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Calibre-Web Automated fork of Calibre-Web
# Copyright (C) 2018-2026 Calibre-Web contributors
# Copyright (C) 2024-2026 Calibre-Web Automated contributors
# SPDX-License-Identifier: GPL-3.0-or-later
# See CONTRIBUTORS for full list of authors.
import argparse
import atexit
import json
import os
import re
import sqlite3
import subprocess
import sys
import tempfile
import time
from datetime import datetime
from pathlib import Path
import unicodedata
from cwa_db import CWA_DB
try:
from cps.utils.filename_sanitizer import get_valid_filename_shared
except ModuleNotFoundError:
# Add project root (parent of scripts/) to sys.path and retry
this_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.abspath(os.path.join(this_dir, '..'))
if project_root not in sys.path:
sys.path.insert(0, project_root)
try:
from cps.utils.filename_sanitizer import get_valid_filename_shared # type: ignore
except Exception:
# Inline fallback: minimal mirror of CW behavior used only if import fails
import re as _re
try:
import unidecode as _unidecode # type: ignore
except Exception:
_unidecode = None
_ZW_TRIM_RE = _re.compile(r"(^[\s\u200B-\u200D\ufeff]+)|([\s\u200B-\u200D\ufeff]+$)")
def _strip_ws(text: str) -> str:
return _ZW_TRIM_RE.sub("", text)
def get_valid_filename_shared(value: str,
replace_whitespace: bool = True,
chars: int = 128,
unicode_filename: bool = False) -> str:
if not isinstance(value, str):
value = str(value) if value is not None else ""
if value[-1:] == '.':
value = value[:-1] + '_'
value = value.replace("/", "_").replace(":", "_").strip('\0')
if unicode_filename and _unidecode is not None:
value = _unidecode.unidecode(value)
if replace_whitespace:
value = _re.sub(r'[*+:\\\"/<>?]+', '_', value, flags=_re.U)
value = _re.sub(r'[|]+', ',', value, flags=_re.U)
value = _strip_ws(value.encode('utf-8')[:chars].decode('utf-8', errors='ignore'))
if not value:
raise ValueError("Filename cannot be empty")
return value
try:
from unidecode import unidecode # transliteration used when unicode-filename mode is on
except Exception:
unidecode = None
# Global Variables
dirs_json = "/app/calibre-web-automated/dirs.json"
change_logs_dir = "/app/calibre-web-automated/metadata_change_logs"
metadata_temp_dir = "/app/calibre-web-automated/metadata_temp"
# Creates a lock file unless one already exists meaning an instance of the script is
# already running, then the script is closed, the user is notified and the program
# exits with code 2
try:
lock = open(tempfile.gettempdir() + '/cover_enforcer.lock', 'x')
lock.close()
except FileExistsError:
print("[cover-metadata-enforcer]: CANCELLING... cover-metadata-enforcer was initiated but is already running")
sys.exit(2)
# Defining function to delete the lock on script exit
def removeLock():
os.remove(tempfile.gettempdir() + '/cover_enforcer.lock')
# Will automatically run when the script exits
atexit.register(removeLock)
class Book:
def __init__(self, book_dir: str, file_path: str):
self.book_dir: str = book_dir
self.file_path: str = file_path
self.calibre_library = self.get_calibre_library()
self.file_format: str = Path(file_path).suffix.replace('.', '')
self.timestamp: str = self.get_time()
self.book_id: str = (list(re.findall(r"\(\d*\)", book_dir))[-1])[1:-1]
self.book_title, self.author_name, self.title_author = self.get_title_and_author()
self.calibre_env = os.environ.copy()
# Enables Calibre plugins to be used from /config/plugins
self.calibre_env["HOME"] = "/config"
# Gets split library info from app.db and sets library dir to the split dir if split library is enabled
self.split_library = self.get_split_library()
if self.split_library:
self.calibre_library = self.split_library["split_path"]
self.calibre_env['CALIBRE_OVERRIDE_DATABASE_PATH'] = os.path.join(self.split_library["db_path"], "metadata.db")
self.cover_path = book_dir + '/cover.jpg'
self.old_metadata_path = book_dir + '/metadata.opf'
self.new_metadata_path = self.get_new_metadata_path()
self.log_info = None
def get_split_library(self) -> dict[str, str] | None:
"""Checks whether or not the user has split library enabled. Returns None if they don't and the path of the Split Library location if True."""
con = sqlite3.connect("/config/app.db", timeout=60)
cur = con.cursor()
split_library = cur.execute('SELECT config_calibre_split FROM settings;').fetchone()[0]
if split_library:
split_path = cur.execute('SELECT config_calibre_split_dir FROM settings;').fetchone()[0]
db_path = cur.execute('SELECT config_calibre_dir FROM settings;').fetchone()[0]
con.close()
return {
"split_path": split_path,
"db_path": db_path
}
else:
con.close()
return None
def get_calibre_library(self) -> str:
"""Gets Calibre-Library location from dirs.json"""
with open(dirs_json, 'r') as f:
dirs = json.load(f)
return dirs['calibre_library_dir'] # Returns without / on the end
def get_time(self) -> str:
now = datetime.now()
return now.strftime('%Y-%m-%d %H:%M:%S')
def get_title_and_author(self) -> tuple[str, str, str]:
title_author = self.file_path.split('/')[-1].split(f'.{self.file_format}')[0]
book_title = title_author.split(f" - {title_author.split(' - ')[-1]}")[0]
author_name = title_author.split(' - ')[-1]
return book_title, author_name, title_author
def get_new_metadata_path(self) -> str:
"""Uses the export function of the calibredb utility to export any new metadata for the given book to metadata_temp, and returns the path to the new metadata.opf"""
# Add retry logic with exponential backoff to handle database locks
max_retries = 3
for attempt in range(max_retries):
try:
# Add small delay before first attempt to allow other operations to complete
if attempt > 0:
delay = 2 ** attempt # Exponential backoff: 2s, 4s
print(f"[cover-metadata-enforcer] Retrying calibredb export (attempt {attempt + 1}/{max_retries}) after {delay}s delay...", flush=True)
time.sleep(delay)
else:
# Small initial delay to ensure database writes are flushed
time.sleep(0.5)
result = subprocess.run(
["calibredb", "export", "--with-library", self.calibre_library, "--to-dir", metadata_temp_dir, self.book_id],
env=self.calibre_env, check=False, capture_output=True, text=True, timeout=60
)
if result.returncode == 0:
temp_files = [os.path.join(dirpath,f) for (dirpath, dirnames, filenames) in os.walk(metadata_temp_dir) for f in filenames]
opf_files = [f for f in temp_files if f.endswith('.opf')]
if opf_files:
return opf_files[0]
else:
raise FileNotFoundError("No .opf file found after calibredb export")
else:
if attempt < max_retries - 1 and "database is locked" in result.stderr.lower():
continue # Retry on database lock
else:
raise subprocess.CalledProcessError(result.returncode, result.args, result.stdout, result.stderr)
except subprocess.TimeoutExpired:
if attempt < max_retries - 1:
continue
else:
raise
# If all retries failed
raise RuntimeError(f"Failed to export metadata for book {self.book_id} after {max_retries} attempts")
def export_as_dict(self) -> dict[str,str | None]:
return {
"book_dir":self.book_dir,
"file_path":self.file_path,
"calibre_library":self.calibre_library,
"file_format":self.file_format,
"timestamp":self.timestamp,
"book_id":self.book_id,
"book_title":self.book_title,
"author_name":self.author_name,
"title_author":self.title_author,
"cover_path":self.cover_path,
"old_metadata_path":self.old_metadata_path,
"new_metadata_path":self.new_metadata_path,
"log_info":self.log_info
}
class Enforcer:
def __init__(self, args):
self.db = CWA_DB()
self.cwa_settings = self.db.cwa_settings
self.enforcer_on = self.cwa_settings["auto_metadata_enforcement"]
self.supported_formats = ["epub", "azw3"]
self.args = args
self.calibre_library = self.get_calibre_library()
self.illegal_characters = ["<", ">", ":", '"', "/", "\\", "|", "?", "*"]
self.calibre_env = os.environ.copy()
# Enables Calibre plugins to be used from /config/plugins
self.calibre_env["HOME"] = "/config"
# Gets split library info from app.db and sets library dir to the split dir if split library is enabled
self.split_library = self.get_split_library()
if self.split_library:
self.calibre_library = self.split_library["split_path"]
self.calibre_env['CALIBRE_OVERRIDE_DATABASE_PATH'] = os.path.join(self.split_library["db_path"], "metadata.db")
# Read Calibre-Web setting: config_unicode_filename (True -> transliterate non-English in filenames)
try:
with sqlite3.connect("/config/app.db", timeout=60) as con:
cur = con.cursor()
self.unicode_filename = bool(cur.execute('SELECT config_unicode_filename FROM settings;').fetchone()[0])
except Exception:
self.unicode_filename = False
def _ascii_transliterate(self, s: str) -> str:
"""Transliterate non-English characters to ASCII when configured.
Prefer unidecode if available; otherwise use NFKD normalization and drop diacritics."""
if not s:
return s
if unidecode is not None:
return unidecode(s)
# Fallback transliteration
return unicodedata.normalize('NFKD', s).encode('ascii', 'ignore').decode('ascii')
def get_split_library(self) -> dict[str, str] | None:
"""Checks whether or not the user has split library enabled. Returns None if they don't and the path of the Split Library location if True."""
con = sqlite3.connect("/config/app.db", timeout=60)
cur = con.cursor()
split_library = cur.execute('SELECT config_calibre_split FROM settings;').fetchone()[0]
if split_library:
split_path = cur.execute('SELECT config_calibre_split_dir FROM settings;').fetchone()[0]
db_path = cur.execute('SELECT config_calibre_dir FROM settings;').fetchone()[0]
con.close()
return {
"split_path": split_path,
"db_path": db_path
}
else:
con.close()
return None
def get_calibre_library(self) -> str:
with open(dirs_json, 'r') as f:
dirs = json.load(f)
return dirs['calibre_library_dir'] # Returns without / on the end
def _recalculate_checksum_after_modification(self, book_id: str, file_format: str, file_path: str) -> None:
"""Calculate and store new checksum after modifying a book file."""
try:
# Import the checksum calculation function
import sys
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
if project_root not in sys.path:
sys.path.insert(0, project_root)
from cps.progress_syncing.checksums import calculate_koreader_partial_md5, store_checksum, CHECKSUM_VERSION
# Calculate new checksum
checksum = calculate_koreader_partial_md5(file_path)
if not checksum:
print(f"[cover-metadata-enforcer] Warning: Failed to calculate checksum for {file_path}", flush=True)
return
# Store in database using centralized manager function
metadb_path = os.path.join(
(self.split_library or {}).get("db_path", self.calibre_library),
"metadata.db"
)
con = sqlite3.connect(metadb_path, timeout=60)
try:
success = store_checksum(
book_id=int(book_id),
book_format=file_format.upper(),
checksum=checksum,
version=CHECKSUM_VERSION,
db_connection=con
)
if success:
print(f"[cover-metadata-enforcer] Stored checksum {checksum[:8]}... for book {book_id} format {file_format} ({CHECKSUM_VERSION})", flush=True)
else:
print(f"[cover-metadata-enforcer] Warning: Failed to store checksum for book {book_id}", flush=True)
finally:
con.close()
except Exception as e:
print(f"[cover-metadata-enforcer] Warning: Failed to recalculate checksum: {e}", flush=True)
import traceback
print(traceback.format_exc(), flush=True)
def read_log(self, auto=True, log_path: str = "None") -> dict:
"""Reads pertinent information from the given log file, adds the book_id from the log name and returns the info as a dict.
Returns None if the file doesn't exist after retries (handles race conditions)."""
if auto:
file_path = f'{change_logs_dir}/{self.args.log}'
book_id = (self.args.log.split('-')[1]).split('.')[0]
timestamp_raw = self.args.log.split('-')[0]
else:
file_path = log_path
log_name = os.path.basename(log_path)
book_id = (log_name.split('-')[1]).split('.')[0]
timestamp_raw = log_name.split('-')[0]
try:
timestamp = datetime.strptime(timestamp_raw, '%Y%m%d%H%M%S')
except ValueError as e:
print(f"[cover-metadata-enforcer] ERROR: Invalid timestamp format in log filename: {e}", flush=True)
return None
# Retry logic to handle race conditions where file is detected but not yet fully written
max_retries = 3
retry_delay = 0.5 # seconds
for attempt in range(max_retries):
try:
# Check if file exists first
if not os.path.exists(file_path):
if attempt < max_retries - 1:
time.sleep(retry_delay)
continue
else:
print(f"[cover-metadata-enforcer] WARNING: Log file '{os.path.basename(file_path)}' not found after {max_retries} attempts. "
f"This may be due to a race condition or the file was already processed and deleted.", flush=True)
return None
# Try to read the file
with open(file_path, 'r', encoding='utf-8') as f:
log_info = json.load(f)
log_info['book_id'] = book_id
log_info['timestamp'] = timestamp.strftime('%Y-%m-%d %H:%M:%S')
return log_info
except FileNotFoundError:
if attempt < max_retries - 1:
time.sleep(retry_delay)
continue
else:
print(f"[cover-metadata-enforcer] WARNING: Log file '{os.path.basename(file_path)}' not found after {max_retries} attempts. "
f"This may be due to a race condition or the file was already processed and deleted.", flush=True)
return None
except json.JSONDecodeError as e:
if attempt < max_retries - 1:
# File might still be being written
time.sleep(retry_delay)
continue
else:
print(f"[cover-metadata-enforcer] ERROR: Failed to parse log file '{os.path.basename(file_path)}': {e}", flush=True)
return None
except Exception as e:
print(f"[cover-metadata-enforcer] ERROR: Unexpected error reading log file '{os.path.basename(file_path)}': {e}", flush=True)
return None
return None
def get_book_dir_from_log(self, log_info: dict) -> str:
"""Resolve the on-disk book directory prioritizing ones that contain supported files.
Order of preference: DB path -> any (id)-suffix dirs -> reconstructed ASCII/raw (based on config).
Within each, prefer the one that actually contains EPUB/AZW3. When config_unicode_filename is True,
prefer the ASCII path over a diacritic sibling if both exist."""
book_id = str(log_info['book_id']).strip()
candidate_dirs: list[str] = []
# 1) DB-based resolution (split-library aware)
try:
metadb_path = os.path.join(
(self.split_library or {}).get("db_path", self.calibre_library),
"metadata.db",
)
con = sqlite3.connect(metadb_path, timeout=60)
try:
cur = con.cursor()
row = cur.execute('SELECT path FROM books WHERE id = ?', (book_id,)).fetchone()
finally:
con.close()
if row and row[0]:
resolved = os.path.join(self.calibre_library, row[0])
resolved = resolved if resolved.endswith(os.sep) else resolved + os.sep
if os.path.isdir(resolved):
candidate_dirs.append(resolved)
if self.args and getattr(self.args, 'verbose', False):
print(f"[cover-metadata-enforcer] Candidate from DB: {resolved}", flush=True)
except Exception as e:
if self.args and getattr(self.args, 'verbose', False):
print(f"[cover-metadata-enforcer] WARN: DB lookup failed for id={book_id}: {e}", flush=True)
# 2) All directories that end with (book_id)
target_suffix = f"({book_id})"
try:
for dirpath, dirnames, _ in os.walk(self.calibre_library):
for d in dirnames:
if d.endswith(target_suffix):
p = os.path.join(dirpath, d)
p = p if p.endswith(os.sep) else p + os.sep
if os.path.isdir(p):
candidate_dirs.append(p)
if self.args and getattr(self.args, 'verbose', False):
if candidate_dirs:
print(f"[cover-metadata-enforcer] Found {len(candidate_dirs)} candidate(s) including DB/ID-search", flush=True)
except Exception:
pass
# 3) Reconstruct from log names using EXACT CW sanitization
raw_title = str(log_info.get('title', '')).strip()
# CW uses only the first author to build the folder
raw_author_full = str(log_info.get('authors', '')).strip().replace(' & ', ', ')
raw_author = raw_author_full.split(', ')[0] if ', ' in raw_author_full else raw_author_full
# Build both transliterated and non-transliterated variants using shared sanitizer
# Guard against empty/invalid values to avoid crashing on fresh/partial metadata
try:
title_ascii = get_valid_filename_shared(raw_title, chars=96, unicode_filename=True)
except Exception:
# Fallback: minimal safe title using book id
title_ascii = f"book_{book_id}"
try:
author_ascii = get_valid_filename_shared(raw_author, chars=96, unicode_filename=True)
except Exception:
author_ascii = "Unknown Author"
try:
title_raw = get_valid_filename_shared(raw_title, chars=96, unicode_filename=False)
except Exception:
title_raw = f"book_{book_id}"
try:
author_raw = get_valid_filename_shared(raw_author, chars=96, unicode_filename=False)
except Exception:
author_raw = "Unknown Author"
reconstructed_ascii = os.path.join(self.calibre_library, author_ascii, f"{title_ascii} ({book_id})")
reconstructed_raw = os.path.join(self.calibre_library, author_raw, f"{title_raw} ({book_id})")
# Prefer ASCII first when config demands transliteration
recon_order = [reconstructed_ascii, reconstructed_raw] if self.unicode_filename else [reconstructed_raw, reconstructed_ascii]
candidate_dirs.extend([(p if p.endswith(os.sep) else p + os.sep) for p in recon_order])
# Deduplicate while preserving order
seen = set()
deduped_candidates = []
for c in candidate_dirs:
if c not in seen:
seen.add(c)
deduped_candidates.append(c)
# Split into preferred vs alternate based on config_unicode_filename
def is_preferred(path: str) -> bool:
base = author_ascii if self.unicode_filename else author_raw
return path.startswith(os.path.join(self.calibre_library, base) + os.sep)
preferred_candidates = [c for c in deduped_candidates if is_preferred(c)]
alternate_candidates = [c for c in deduped_candidates if not is_preferred(c)]
# Choose the first candidate that exists and contains supported files (preferred first)
for group_name, group in (("preferred", preferred_candidates), ("alternate", alternate_candidates)):
for c in group:
if os.path.isdir(c):
sf = self.get_supported_files_from_dir(c)
if sf:
if self.args and getattr(self.args, 'verbose', False):
print(f"[cover-metadata-enforcer] Selected {group_name} candidate with supported files: {c}", flush=True)
log_info['file_path'] = c
return c
# If none have supported files, but some dirs exist, choose best available (prefer ASCII if exists)
existing_pref = [c for c in preferred_candidates if os.path.isdir(c)]
existing_alt = [c for c in alternate_candidates if os.path.isdir(c)]
existing = existing_pref or existing_alt
if existing:
# Try to pick ASCII-looking path if config is True
preferred = None
for c in existing_pref:
preferred = c
break
if not preferred:
preferred = existing[0]
if self.args and getattr(self.args, 'verbose', False):
print(f"[cover-metadata-enforcer] No supported files in candidates; falling back to existing dir: {preferred}", flush=True)
log_info['file_path'] = preferred
return preferred
# Nothing exists; fall back to reconstructed path that matches config
fallback = (reconstructed_ascii if self.unicode_filename else reconstructed_raw)
fallback = fallback if fallback.endswith(os.sep) else fallback + os.sep
if self.args and getattr(self.args, 'verbose', False):
print(f"[cover-metadata-enforcer] Resolved via reconstructed path (not found on disk): {fallback}", flush=True)
log_info['file_path'] = fallback
return fallback
def get_supported_files_from_dir(self, dir: str) -> list[str]:
""" Returns a list if the book dir given contains files of one or more of the supported formats"""
library_files = [os.path.join(dirpath, f) for (dirpath, dirnames, filenames) in os.walk(dir) for f in filenames]
supported_files = []
for format in self.supported_formats:
supported_files += [f for f in library_files if f.lower().endswith(f'.{format}')]
return supported_files
def enforce_cover(self, book_dir: str) -> list:
"""Will force the Cover & Metadata to update for the supported book files in the given directory"""
supported_files = self.get_supported_files_from_dir(book_dir)
if supported_files:
if len(supported_files) > 1:
print("[cover-metadata-enforcer] Multiple file formats for current book detected...", flush=True)
book_objects = []
for file in supported_files:
book = Book(book_dir, file)
self.replace_old_metadata(book.old_metadata_path, book.new_metadata_path)
# Use subprocess instead of os.system for better error handling
# Add small delay to ensure any file locks are released
time.sleep(0.5)
try:
if Path(book.cover_path).exists():
result = subprocess.run(
['ebook-polish', '-c', book.cover_path, '-o', book.new_metadata_path, '-U', file, file],
capture_output=True, text=True, timeout=120, check=False
)
else:
result = subprocess.run(
['ebook-polish', '-o', book.new_metadata_path, '-U', file, file],
capture_output=True, text=True, timeout=120, check=False
)
if result.returncode != 0:
print(f"[cover-metadata-enforcer] Warning: ebook-polish returned {result.returncode} for {file}", flush=True)
if result.stderr:
print(f"[cover-metadata-enforcer] Error output: {result.stderr.strip()}", flush=True)
except subprocess.TimeoutExpired:
print(f"[cover-metadata-enforcer] Error: ebook-polish timed out for {file}", flush=True)
except Exception as e:
print(f"[cover-metadata-enforcer] Error running ebook-polish for {file}: {e}", flush=True)
self.empty_metadata_temp()
print(f"[cover-metadata-enforcer]: DONE: '{book.title_author}.{book.file_format}': Cover & Metadata updated", flush=True)
# Calculate and store new checksum after modification
self._recalculate_checksum_after_modification(book.book_id, book.file_format, file)
book_objects.append(book)
return book_objects
else:
print(f"[cover-metadata-enforcer]: No supported file formats found in {book_dir}.", flush=True)
print("[cover-metadata-enforcer]: *** NOTICE **** Only EPUB & AZW3 formats are currently supported.", flush=True)
return []
def enforce_all_covers(self) -> tuple[int, float, int] | tuple[bool, bool, bool]:
"""Will force the covers and metadata to be re-generated for all books in the library"""
t_start = time.time()
supported_files = self.get_supported_files_from_dir(self.calibre_library)
if supported_files:
book_dirs = []
for file in supported_files:
book_dirs.append(os.path.dirname(file))
print(f"[cover-metadata-enforcer]: {len(book_dirs)} books detected in Library")
print(f"[cover-metadata-enforcer]: Enforcing covers for {len(supported_files)} supported file(s) in {self.calibre_library} ...")
successful_enforcements = len(supported_files)
for book_dir in book_dirs:
try:
book_objects = self.enforce_cover(book_dir)
if book_objects:
book_dicts = []
for book in book_objects:
book_dicts.append(book.export_as_dict())
self.db.enforce_add_entry_from_all(book_dicts)
except Exception as e:
print(f"[cover-metadata-enforcer]: ERROR: {book_dir}")
print(f"[cover-metadata-enforcer]: Skipping book due to following error: {e}")
successful_enforcements = successful_enforcements - 1
continue
t_end = time.time()
return successful_enforcements, (t_end - t_start), len(supported_files)
else: # No supported files found
return False, False, False
def replace_old_metadata(self, old_metadata: str, new_metadata: str) -> None:
"""Switches the metadata in metadata_temp with the metadata in the Calibre-Library"""
os.system(f'cp "{new_metadata}" "{old_metadata}"')
def print_library_list(self) -> None:
"""Uses the calibredb command line utility to list the books in the library"""
subprocess.run(["calibredb", "list", "--with-library", self.calibre_library], env=self.calibre_env, check=True)
def delete_log(self, auto=True, log_path="None"):
"""Deletes the log file"""
try:
if auto:
log = os.path.join(change_logs_dir, self.args.log)
os.remove(log)
else:
os.remove(log_path)
except FileNotFoundError:
# Log may already be removed by another process or cleanup path
return
def record_failed_enforcement(self, log_info: dict, error: Exception | str) -> None:
"""Record a failed enforcement attempt so admins can see it in stats."""
try:
# Ensure file_path exists for DB insert
if not log_info.get('file_path'):
log_info['file_path'] = "unknown"
self.db.enforce_add_entry_from_log(log_info, trigger_type="auto -log (failed)")
except Exception as e:
print(f"[cover-metadata-enforcer] WARNING: Unable to record failed enforcement: {e}", flush=True)
# Always surface the failure to logs
print(f"[cover-metadata-enforcer] ERROR: Failed to enforce metadata for '{log_info.get('title', 'Unknown')}' (book_id={log_info.get('book_id', 'unknown')}): {error}", flush=True)
def empty_metadata_temp(self):
"""Empties the metadata_temp folder"""
os.system(f"rm -r {metadata_temp_dir}/*")
def check_for_other_logs(self):
log_files = [os.path.join(dirpath,f) for (dirpath, dirnames, filenames) in os.walk(change_logs_dir) for f in filenames]
if len(log_files) > 0:
print(f"[cover-metadata-enforcer] {len(log_files)} Additional metadata changes detected, processing now..", flush=True)
for log in log_files:
if log.endswith('.json'):
log_info = self.read_log(auto=False, log_path=log)
# Skip if log_info is None (file was deleted or invalid)
if log_info is None:
continue
book_dir = self.get_book_dir_from_log(log_info)
try:
book_objects = self.enforce_cover(book_dir)
if book_objects:
for book in book_objects:
book.log_info = log_info
book.log_info['file_path'] = book.file_path
self.db.enforce_add_entry_from_log(book.log_info)
else:
self.record_failed_enforcement(log_info, "No supported files or enforcement failed")
except Exception as e:
self.record_failed_enforcement(log_info, e)
finally:
self.delete_log(auto=False, log_path=log)
def main():
parser = argparse.ArgumentParser(
prog='cover-enforcer',
description='Upon receiving a log, valid directory or an "-all" flag, this \
script will enforce the covers and metadata of the corresponding books, making \
sure that each are correctly stored in both the ebook files themselves as well as in the \
user\'s Calibre Library. Additionally, if an epub file happens to be in EPUB 2 \
format, it will also be automatically upgraded to EPUB 3.'
)
parser.add_argument('--log', action='store', dest='log', required=False, help='Will enforce the covers and metadata of the books in the given log file.', default=None)
parser.add_argument('--dir', action='store', dest='dir', required=False, help='Will enforce the covers and metadata of the books in the given directory.', default=None)
parser.add_argument('-all', action='store_true', dest='all', help='Will enforce covers & metadata for ALL books currently in your calibre-library-dir', default=False)
parser.add_argument('-list', '-l', action='store_true', dest='list', help='List all books in your calibre-library-dir', default=False)
parser.add_argument('-history', action='store_true', dest='history', help='Display a history of all enforcements ever carried out on your machine (not yet implemented)', default=False)
parser.add_argument('-paths', '-p', action='store_true', dest='paths', help="Use with '-history' flag to display stored paths of all files in enforcement database", default=False)
parser.add_argument('-v', '--verbose', action='store_true', dest='verbose', help="Use with history to display entire enforcement history instead of only the most recent 10 entries", default=False)
args = parser.parse_args()
enforcer = Enforcer(args)
if len(sys.argv) == 1:
parser.print_help()
######################### QUERY ARGS ###########################
elif args.log is not None and args.dir is not None:
### log and dir provided together
parser.print_usage()
elif args.list and args.log is None and args.dir is None and args.all is False and args.history is False:
### only list flag passed
enforcer.print_library_list()
elif args.history and args.log is None and args.dir is None and args.all is False and args.list is False:
### only history flag passed
enforcer.db.enforce_show(args.paths, args.verbose)
######################### ENFORCEMENT ARGS ###########################
elif args.all and args.log is None and args.dir is None and args.list is False and args.history is False:
### only all flag passed
print('[cover-metadata-enforcer]: Enforcing metadata and covers for all books in library...')
n_enforced, completion_time, n_supported_files = enforcer.enforce_all_covers()
if n_enforced == False:
print(f"\n[cover-metadata-enforcer]: No supported ebook files found in library (only EPUB & AZW3 formats are currently supported)")
elif n_enforced == n_supported_files:
print(f"\n[cover-metadata-enforcer]: SUCCESS: All covers & metadata successfully updated for all {n_enforced} supported ebooks in the library in {completion_time:.2f} seconds!")
elif n_enforced == 0:
print("\n[cover-metadata-enforcer]: FAILURE: Supported files found but none we're successfully enforced. See the log above for details.")
elif n_enforced < n_supported_files:
print(f"\n[cover-metadata-enforcer]: PARTIAL SUCCESS: Out of {n_supported_files} supported files detected, {n_enforced} were successfully enforced. See log above for details")
elif args.log is None and args.dir is not None and args.all is False and args.list is False and args.history is False:
### dir passed, no log, not all, no flags
if args.dir[-1] == '/':
args.dir = args.dir[:-1]
if os.path.isdir(args.dir):
book_objects = enforcer.enforce_cover(args.dir)
if book_objects:
book_dicts = []
for book in book_objects:
book_dicts.append(book.export_as_dict())
enforcer.db.enforce_add_entry_from_dir(book_dicts)
else:
print(f"[cover-metadata-enforcer]: ERROR: '{args.dir}' is not a valid directory")
elif args.log is not None and args.dir is None and args.all is False and args.list is False and args.history is False:
### log passed: (args.log), no dir
log_info = enforcer.read_log()
# Handle case where log file doesn't exist (race condition)
if log_info is None:
print(f"[cover-metadata-enforcer] Skipping processing due to missing or invalid log file. This is normal if the file was already processed.")
sys.exit(0)
book_dir = enforcer.get_book_dir_from_log(log_info)
if enforcer.enforcer_on:
try:
book_objects = enforcer.enforce_cover(book_dir)
if not book_objects:
print(f"[cover-metadata-enforcer] Metadata for '{log_info['title']}' not successfully enforced")
enforcer.record_failed_enforcement(log_info, "No supported files or enforcement failed")
enforcer.delete_log()
sys.exit(1)
for book in book_objects:
book.log_info = log_info
book.log_info['file_path'] = book.file_path
enforcer.db.enforce_add_entry_from_log(book.log_info)
enforcer.delete_log()
enforcer.check_for_other_logs()
except Exception as e:
enforcer.record_failed_enforcement(log_info, e)
enforcer.delete_log()
sys.exit(1)
else: # Enforcer has been disabled in the CWA Settings
print(f"[cover-metadata-enforcer] The CWA Automatic Metadata enforcement service is currently disabled in the settings. Therefore the metadata changes for {log_info['title'].replace(':', '_')} won't be enforced.\n\nThis means that the changes made will appear in the Web UI, but not be stored in the ebook files themselves.")
enforcer.delete_log()
else:
parser.print_usage()
sys.exit(0)
if __name__ == "__main__":
main()