cwa: harden ingest flow, manifest handling, and stale temp cleanup
Problems: - Sidecar manifest files were being treated as ingest targets, causing premature deletion. - Ignored/temporary ingest artifacts could be deleted too early when readiness checks timed out. - Stale temp cleanup was hardcoded, not user-configurable, and required restarts to change behavior. Solutions: - Filtered manifest files in the ingest watcher and added processor guards to skip them. - Added skip-delete handling for ignored/temporary files on readiness timeout to preserve artifacts. Implemented robust stale temp cleanup with age and interval settings. - Persisted cleanup settings in the CWA database with sane defaults and validation. - Exposed new cleanup controls in the settings UI and made the ingest service read live values from the database instead of environment variables. Other changes: - Centralized integer parsing and defaulting logic for the new settings. - Added clear UI descriptions and bounds for the new cleanup options. - Improved observability with explicit log messages for skip-delete behavior and cleanup timing.
This commit is contained in:
+14
-1
@@ -619,7 +619,7 @@ def set_cwa_settings():
|
||||
boolean_settings = []
|
||||
string_settings = []
|
||||
list_settings = []
|
||||
integer_settings = ['ingest_timeout_minutes', 'auto_send_delay_minutes', 'hardcover_auto_fetch_batch_size', 'hardcover_auto_fetch_schedule_hour', 'duplicate_scan_hour', 'duplicate_scan_chunk_size', 'duplicate_scan_debounce_seconds', 'duplicate_auto_resolve_cooldown_minutes'] # Special handling for integer settings
|
||||
integer_settings = ['ingest_timeout_minutes', 'ingest_stale_temp_minutes', 'ingest_stale_temp_interval', 'auto_send_delay_minutes', 'hardcover_auto_fetch_batch_size', 'hardcover_auto_fetch_schedule_hour', 'duplicate_scan_hour', 'duplicate_scan_chunk_size', 'duplicate_scan_debounce_seconds', 'duplicate_auto_resolve_cooldown_minutes'] # Special handling for integer settings
|
||||
float_settings = ['hardcover_auto_fetch_min_confidence', 'hardcover_auto_fetch_rate_limit'] # Special handling for float settings
|
||||
json_settings = ['metadata_provider_hierarchy', 'metadata_providers_enabled', 'duplicate_format_priority'] # Special handling for JSON settings
|
||||
skip_settings = ['auto_convert_ignored_formats', 'auto_ingest_ignored_formats', 'auto_convert_retained_formats'] # Handled through individual format checkboxes
|
||||
@@ -699,6 +699,10 @@ def set_cwa_settings():
|
||||
# Validate range
|
||||
if setting == 'ingest_timeout_minutes':
|
||||
int_value = max(5, min(120, int_value)) # Clamp between 5 and 120 minutes
|
||||
elif setting == 'ingest_stale_temp_minutes':
|
||||
int_value = max(0, min(10080, int_value)) # Clamp between 0 and 10080 minutes (7 days)
|
||||
elif setting == 'ingest_stale_temp_interval':
|
||||
int_value = max(0, min(86400, int_value)) # Clamp between 0 and 86400 seconds (24 hours)
|
||||
elif setting == 'auto_send_delay_minutes':
|
||||
int_value = max(1, min(60, int_value)) # Clamp between 1 and 60 minutes
|
||||
elif setting == 'hardcover_auto_fetch_batch_size':
|
||||
@@ -716,6 +720,10 @@ def set_cwa_settings():
|
||||
# Use current value if conversion fails
|
||||
if setting == 'ingest_timeout_minutes':
|
||||
result[setting] = cwa_db.cwa_settings.get(setting, 15) # Default to 15 minutes
|
||||
elif setting == 'ingest_stale_temp_minutes':
|
||||
result[setting] = cwa_db.cwa_settings.get(setting, 120) # Default to 120 minutes
|
||||
elif setting == 'ingest_stale_temp_interval':
|
||||
result[setting] = cwa_db.cwa_settings.get(setting, 600) # Default to 600 seconds
|
||||
elif setting == 'auto_send_delay_minutes':
|
||||
result[setting] = cwa_db.cwa_settings.get(setting, 5) # Default to 5 minutes
|
||||
elif setting == 'hardcover_auto_fetch_batch_size':
|
||||
@@ -727,6 +735,10 @@ def set_cwa_settings():
|
||||
else:
|
||||
if setting == 'ingest_timeout_minutes':
|
||||
result[setting] = cwa_db.cwa_settings.get(setting, 15) # Default to 15 minutes
|
||||
elif setting == 'ingest_stale_temp_minutes':
|
||||
result[setting] = cwa_db.cwa_settings.get(setting, 120) # Default to 120 minutes
|
||||
elif setting == 'ingest_stale_temp_interval':
|
||||
result[setting] = cwa_db.cwa_settings.get(setting, 600) # Default to 600 seconds
|
||||
elif setting == 'auto_send_delay_minutes':
|
||||
result[setting] = cwa_db.cwa_settings.get(setting, 5) # Default to 5 minutes
|
||||
elif setting == 'hardcover_auto_fetch_batch_size':
|
||||
@@ -848,6 +860,7 @@ def set_cwa_settings():
|
||||
getenv("HARDCOVER_TOKEN")
|
||||
)
|
||||
|
||||
|
||||
next_scan_run = get_next_duplicate_scan_run(cwa_settings)
|
||||
|
||||
return render_title_template("cwa_settings.html", title=_("Calibre-Web Automated User Settings"), page="cwa-settings",
|
||||
|
||||
@@ -300,6 +300,47 @@
|
||||
border-radius: 4px;
|
||||
background-color: #151e2680;">
|
||||
<small style="color: #bbbbbb; margin-left: 10px;">{{_('Range: 5-120 minutes (default: 15)')}}</small>
|
||||
|
||||
<div class="cwa-settings-tip" style="margin-top: 12px;">
|
||||
<small class="settings-explanation">
|
||||
💡 <strong>{{_('Stale temp cleanup')}}:</strong>
|
||||
{{_('Ignored temporary upload files (.uploading, .part, etc.) can be cleaned after a set age. Setting either value to 0 disables cleanup. Changes apply on the next cleanup cycle.')}}
|
||||
</small>
|
||||
</div>
|
||||
|
||||
<div style="margin-top: 10px;">
|
||||
<label for="ingest_stale_temp_minutes" class="settings-section-header" style="padding-right: 10px; margin-bottom: 6px !important; padding-bottom: 0px !important;">{{_('Stale temp age (minutes):')}}</label>
|
||||
<input type="number"
|
||||
name="ingest_stale_temp_minutes"
|
||||
id="ingest_stale_temp_minutes"
|
||||
value="{{ cwa_settings['ingest_stale_temp_minutes'] }}"
|
||||
min="0"
|
||||
max="10080"
|
||||
step="1"
|
||||
style="width: 120px;
|
||||
padding: 5px;
|
||||
border: 1px solid transparent;
|
||||
border-radius: 4px;
|
||||
background-color: #151e2680;">
|
||||
<small style="color: #bbbbbb; margin-left: 10px;">{{_('Range: 0-10080 minutes (default: 120)')}}</small>
|
||||
</div>
|
||||
|
||||
<div style="margin-top: 10px;">
|
||||
<label for="ingest_stale_temp_interval" class="settings-section-header" style="padding-right: 10px; margin-bottom: 6px !important; padding-bottom: 0px !important;">{{_('Stale temp cleanup interval (seconds):')}}</label>
|
||||
<input type="number"
|
||||
name="ingest_stale_temp_interval"
|
||||
id="ingest_stale_temp_interval"
|
||||
value="{{ cwa_settings['ingest_stale_temp_interval'] }}"
|
||||
min="0"
|
||||
max="86400"
|
||||
step="1"
|
||||
style="width: 120px;
|
||||
padding: 5px;
|
||||
border: 1px solid transparent;
|
||||
border-radius: 4px;
|
||||
background-color: #151e2680;">
|
||||
<small style="color: #bbbbbb; margin-left: 10px;">{{_('Range: 0-86400 seconds (default: 600)')}}</small>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<!-- Auto-Send Delay Setting -->
|
||||
|
||||
@@ -45,8 +45,49 @@ STABLE_CHECKS=${CWA_INGEST_STABLE_CHECKS:-6}
|
||||
STABLE_CONSEC_MATCH=${CWA_INGEST_STABLE_CONSEC_MATCH:-2}
|
||||
STABLE_INTERVAL=${CWA_INGEST_STABLE_INTERVAL:-0.5}
|
||||
MAX_QUEUE_SIZE=${CWA_INGEST_MAX_QUEUE_SIZE:-50}
|
||||
SUPPORTED_EXT_REGEX='(epub|mobi|azw3|azw|pdf|txt|rtf|cbz|cbr|cb7|cbc|fb2|fbz|docx|html|htmlz|lit|lrf|odt|prc|pdb|pml|rb|snb|tcr|txtz|kepub|m4b|m4a|mp4|acsm|kfx|kfx-zip|cwa.json)$'
|
||||
SUPPORTED_EXT_REGEX='(epub|mobi|azw3|azw|pdf|txt|rtf|cbz|cbr|cb7|cbc|fb2|fbz|docx|html|htmlz|lit|lrf|odt|prc|pdb|pml|rb|snb|tcr|txtz|kepub|m4b|m4a|mp4|acsm|kfx|kfx-zip)$'
|
||||
TEMP_SUFFIXES='crdownload download part uploading'
|
||||
get_stale_temp_minutes_from_db() {
|
||||
local minutes
|
||||
if command -v sqlite3 >/dev/null 2>&1; then
|
||||
minutes=$(sqlite3 /config/cwa.db "SELECT ingest_stale_temp_minutes FROM cwa_settings LIMIT 1;" 2>/dev/null || echo "120")
|
||||
else
|
||||
minutes=$(python3 -c "
|
||||
import sqlite3
|
||||
try:
|
||||
conn = sqlite3.connect('/config/cwa.db')
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('SELECT ingest_stale_temp_minutes FROM cwa_settings LIMIT 1')
|
||||
result = cursor.fetchone()
|
||||
conn.close()
|
||||
print(result[0] if result else 120)
|
||||
except:
|
||||
print(120)
|
||||
" 2>/dev/null || echo "120")
|
||||
fi
|
||||
echo "$minutes"
|
||||
}
|
||||
|
||||
get_stale_temp_interval_from_db() {
|
||||
local seconds
|
||||
if command -v sqlite3 >/dev/null 2>&1; then
|
||||
seconds=$(sqlite3 /config/cwa.db "SELECT ingest_stale_temp_interval FROM cwa_settings LIMIT 1;" 2>/dev/null || echo "600")
|
||||
else
|
||||
seconds=$(python3 -c "
|
||||
import sqlite3
|
||||
try:
|
||||
conn = sqlite3.connect('/config/cwa.db')
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('SELECT ingest_stale_temp_interval FROM cwa_settings LIMIT 1')
|
||||
result = cursor.fetchone()
|
||||
conn.close()
|
||||
print(result[0] if result else 600)
|
||||
except:
|
||||
print(600)
|
||||
" 2>/dev/null || echo "600")
|
||||
fi
|
||||
echo "$seconds"
|
||||
}
|
||||
|
||||
wait_for_stable_file() {
|
||||
local file="$1" last_size="" same_count=0 i sz
|
||||
@@ -76,6 +117,24 @@ run_fallback() {
|
||||
done
|
||||
}
|
||||
|
||||
cleanup_stale_temps() {
|
||||
# Skip if disabled or watch folder doesn't exist
|
||||
local minutes
|
||||
minutes=$(get_stale_temp_minutes_from_db)
|
||||
if [ -z "$minutes" ] || [ "$minutes" -le 0 ] || [ ! -d "$WATCH_FOLDER" ]; then
|
||||
return 0
|
||||
fi
|
||||
local deleted_any=0
|
||||
for suf in $TEMP_SUFFIXES; do
|
||||
if find "$WATCH_FOLDER" -type f -name "*.$suf" -mmin +"$minutes" -print -delete 2>/dev/null | grep -q .; then
|
||||
deleted_any=1
|
||||
fi
|
||||
done
|
||||
if [ $deleted_any -eq 1 ]; then
|
||||
echo "[cwa-ingest-service] Cleaned stale temp files older than ${minutes} minutes"
|
||||
fi
|
||||
}
|
||||
|
||||
is_docker_desktop() {
|
||||
local osr mounts
|
||||
osr=$(cat /proc/sys/kernel/osrelease 2>/dev/null || true)
|
||||
@@ -137,11 +196,18 @@ handle_event() {
|
||||
for suf in $TEMP_SUFFIXES; do
|
||||
[[ "$filepath" == *.$suf ]] && return 0
|
||||
done
|
||||
# ignore sidecar manifests
|
||||
if [[ "$filepath" == *.cwa.json ]] || [[ "$filepath" == *.cwa.failed.json ]]; then
|
||||
echo "[cwa-ingest-service] Skipping sidecar manifest: $filepath (handled with data file)"
|
||||
return 0
|
||||
fi
|
||||
# extension filter
|
||||
if ! [[ "$filepath" =~ $SUPPORTED_EXT_REGEX ]]; then
|
||||
return 0
|
||||
fi
|
||||
|
||||
cleanup_stale_temps
|
||||
|
||||
echo "[cwa-ingest-service] New file detected - $filepath - Starting Ingest Processor..."
|
||||
echo "[cwa-ingest-service] Configured timeout: ${configured_timeout}s, Safety timeout: ${safety_timeout}s"
|
||||
echo "processing:$filename:$(date '+%Y-%m-%d %H:%M:%S')" > "$STATUS_FILE"
|
||||
@@ -204,6 +270,21 @@ if is_docker_desktop; then
|
||||
run_fallback; exit 0
|
||||
fi
|
||||
|
||||
# Background cleanup loop for stale temp files
|
||||
if [ -n "$(get_stale_temp_interval_from_db)" ]; then
|
||||
(
|
||||
while true; do
|
||||
cleanup_stale_temps
|
||||
interval=$(get_stale_temp_interval_from_db)
|
||||
if [ -z "$interval" ] || [ "$interval" -le 0 ]; then
|
||||
sleep 60
|
||||
else
|
||||
sleep "$interval"
|
||||
fi
|
||||
done
|
||||
) &
|
||||
fi
|
||||
|
||||
( set -o pipefail
|
||||
s6-setuidgid abc inotifywait -m -r --format="%e %w%f" -e close_write -e moved_to "$WATCH_FOLDER" | \
|
||||
while read -r events filepath; do
|
||||
|
||||
+4
-2
@@ -416,7 +416,9 @@ class CWA_DB:
|
||||
'hardcover_auto_fetch_schedule_hour': 2,
|
||||
'hardcover_auto_fetch_min_confidence': 0.85,
|
||||
'hardcover_auto_fetch_batch_size': 50,
|
||||
'hardcover_auto_fetch_rate_limit': 5.0
|
||||
'hardcover_auto_fetch_rate_limit': 5.0,
|
||||
'ingest_stale_temp_minutes': 120,
|
||||
'ingest_stale_temp_interval': 600
|
||||
}
|
||||
|
||||
# Apply defaults for missing keys
|
||||
@@ -425,7 +427,7 @@ class CWA_DB:
|
||||
cwa_settings[key] = default_value
|
||||
|
||||
# Define which settings should remain as integers (not converted to boolean)
|
||||
integer_settings = ['ingest_timeout_minutes', 'auto_send_delay_minutes', 'hardcover_auto_fetch_batch_size', 'hardcover_auto_fetch_schedule_hour', 'duplicate_scan_hour', 'duplicate_scan_chunk_size', 'duplicate_scan_debounce_seconds']
|
||||
integer_settings = ['ingest_timeout_minutes', 'ingest_stale_temp_minutes', 'ingest_stale_temp_interval', 'auto_send_delay_minutes', 'hardcover_auto_fetch_batch_size', 'hardcover_auto_fetch_schedule_hour', 'duplicate_scan_hour', 'duplicate_scan_chunk_size', 'duplicate_scan_debounce_seconds']
|
||||
|
||||
# Define which settings should remain as floats (not converted to boolean)
|
||||
float_settings = ['hardcover_auto_fetch_min_confidence', 'hardcover_auto_fetch_rate_limit']
|
||||
|
||||
@@ -45,6 +45,8 @@ CREATE TABLE IF NOT EXISTS cwa_settings(
|
||||
auto_convert_retained_formats TEXT DEFAULT "" NOT NULL,
|
||||
auto_ingest_automerge TEXT DEFAULT "new_record" NOT NULL,
|
||||
ingest_timeout_minutes INTEGER DEFAULT 15 NOT NULL,
|
||||
ingest_stale_temp_minutes INTEGER DEFAULT 120 NOT NULL,
|
||||
ingest_stale_temp_interval INTEGER DEFAULT 600 NOT NULL,
|
||||
auto_metadata_enforcement SMALLINT DEFAULT 1 NOT NULL,
|
||||
kindle_epub_fixer SMALLINT DEFAULT 1 NOT NULL,
|
||||
auto_backup_epub_fixes SMALLINT DEFAULT 1 NOT NULL,
|
||||
|
||||
@@ -611,6 +611,10 @@ class NewBookProcessor:
|
||||
def delete_current_file(self) -> None:
|
||||
"""Deletes file just processed from ingest folder"""
|
||||
try:
|
||||
ext = Path(self.filename).suffix.replace('.', '')
|
||||
if ext in self.ingest_ignored_formats or self.filename.endswith(".cwa.json") or self.filename.endswith(".cwa.failed.json"):
|
||||
print(f"[ingest-processor] Skipping delete for ignored/temporary file: {self.filename}", flush=True)
|
||||
return
|
||||
if os.path.exists(self.filepath):
|
||||
os.remove(self.filepath) # Removes processed file
|
||||
else:
|
||||
@@ -1230,6 +1234,7 @@ def main(filepath=None):
|
||||
filepath = sys.argv[1]
|
||||
|
||||
nbp = None
|
||||
skip_delete = False
|
||||
try:
|
||||
##############################################################################################
|
||||
# Truncates the filename if it is too long
|
||||
@@ -1238,6 +1243,11 @@ def main(filepath=None):
|
||||
name, ext = os.path.splitext(filename)
|
||||
allowed_len = MAX_LENGTH - len(ext)
|
||||
|
||||
# Ignore sidecar manifests entirely (handled when the real file is processed)
|
||||
if filename.endswith(".cwa.json") or filename.endswith(".cwa.failed.json"):
|
||||
print(f"[ingest-processor] Skipping sidecar manifest file: {filename}", flush=True)
|
||||
return
|
||||
|
||||
if len(name) > allowed_len:
|
||||
new_name = name[:allowed_len] + ext
|
||||
new_path = os.path.join(os.path.dirname(filepath), new_name)
|
||||
@@ -1262,6 +1272,7 @@ def main(filepath=None):
|
||||
ready = nbp.is_file_in_use()
|
||||
if not ready:
|
||||
print(f"[ingest-processor] WARN: File did not become ready in time or vanished (after {timeout_minutes} minutes): {nbp.filename}", flush=True)
|
||||
skip_delete = True
|
||||
return
|
||||
|
||||
# Sidecar manifest handling for explicit actions (e.g., add_format)
|
||||
@@ -1314,6 +1325,7 @@ def main(filepath=None):
|
||||
if ext in nbp.ingest_ignored_formats:
|
||||
# Do NOT delete ignored temporary files; they may be renamed shortly (e.g. .uploading -> .epub)
|
||||
print(f"[ingest-processor] Skipping ignored/temporary file (no action taken): {nbp.filename}", flush=True)
|
||||
skip_delete = True
|
||||
return
|
||||
|
||||
if nbp.is_target_format: # File can just be imported
|
||||
@@ -1380,7 +1392,10 @@ def main(filepath=None):
|
||||
print(f"[ingest-processor] Error setting library permissions during cleanup: {e}", flush=True)
|
||||
|
||||
try:
|
||||
nbp.delete_current_file()
|
||||
if skip_delete:
|
||||
print(f"[ingest-processor] Skipping delete for ignored/temporary file: {nbp.filename}", flush=True)
|
||||
else:
|
||||
nbp.delete_current_file()
|
||||
except Exception as e:
|
||||
print(f"[ingest-processor] Error deleting current file during cleanup: {e}", flush=True)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user