Files

2461 lines
106 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Calibre-Web Automated fork of Calibre-Web
# Copyright (C) 2018-2026 Calibre-Web contributors
# Copyright (C) 2024-2026 Calibre-Web Automated contributors
# SPDX-License-Identifier: GPL-3.0-or-later
# See CONTRIBUTORS for full list of authors.
import sqlite3
import sys
import os
from sqlite3 import Error as sqlError
import re
from datetime import datetime
from tabulate import tabulate
class CWA_DB:
def __init__(self, verbose=False):
self.verbose = verbose
self.db_file = "cwa.db"
self.db_path = "/config/"
self.con, self.cur = self.connect_to_db() # type: ignore
# Support both Docker and CI environments for schema path
script_dir = os.path.dirname(os.path.abspath(__file__))
self.schema_path = os.path.join(script_dir, "cwa_schema.sql")
self.stats_tables = ["cwa_enforcement", "cwa_import", "cwa_conversions", "epub_fixes", "cwa_user_activity", "cwa_duplicate_cache", "cwa_duplicate_resolutions"]
self.tables, self.schema = self.make_tables()
self.cwa_default_settings = self.get_cwa_default_settings()
self.ensure_settings_schema_match()
self.match_stat_table_columns_with_schema()
self.ensure_scheduled_jobs_schema()
self.set_default_settings()
self.cwa_settings = self.get_cwa_settings()
def connect_to_db(self) -> tuple[sqlite3.Connection, sqlite3.Cursor] | None:
"""Establishes connection with the db or makes one if one doesn't already exist"""
con = None
cur = None
try:
con = sqlite3.connect(self.db_path + self.db_file, timeout=30)
except sqlError as e:
print(f"[cwa-db]: The following error occurred while trying to connect to the CWA Enforcement DB: {e}")
sys.exit(0)
if con:
cur = con.cursor()
if self.verbose:
print("[cwa-db]: Connection with the CWA Enforcement DB Successful!")
return con, cur
def make_tables(self) -> tuple[list[str], list[str]]:
"""Creates the tables for the CWA DB if they don't already exist"""
schema = []
with open(self.schema_path, 'r') as f:
for line in f:
if line != "\n":
schema.append(line)
tables = "".join(schema)
tables = tables.split(';')
tables.pop(-1)
for x in range(len(tables)):
tables[x] = tables[x] + ";"
for table in tables:
self.cur.execute(table)
self.con.commit()
return tables, schema
def _normalize_user_ids(self, user_id) -> list[int]:
"""Normalize user filters to a list of integer IDs."""
if user_id is None:
return []
if isinstance(user_id, (list, tuple, set)):
return [int(uid) for uid in user_id if uid is not None]
try:
return [int(user_id)]
except (TypeError, ValueError):
return []
def _has_user_filter(self, user_id) -> bool:
"""Return True when a valid user filter is provided."""
return len(self._normalize_user_ids(user_id)) > 0
def _build_user_filter(self, user_id) -> str:
"""Builds SQL filter for a single user ID or list of user IDs."""
user_ids = self._normalize_user_ids(user_id)
if not user_ids:
return ""
if len(user_ids) == 1:
return f" AND user_id = {user_ids[0]}"
user_ids_csv = ",".join(str(uid) for uid in user_ids)
return f" AND user_id IN ({user_ids_csv})"
def get_cwa_default_settings(self):
for table in self.tables:
if "cwa_settings" in table:
settings_table = table.strip()
break
settings_lines = []
for line in settings_table.split('\n'):
stripped = line.strip()
# Skip comment lines and empty lines
if line[:4] == " " and not stripped.startswith('--') and stripped:
settings_lines.append(stripped)
default_settings = {}
for line in settings_lines:
# Extract setting name and DEFAULT value more carefully
# Format: setting_name TYPE DEFAULT value [NOT NULL]
if ' DEFAULT ' not in line:
continue
setting_name = line.split()[0]
# Extract everything after DEFAULT
default_start = line.index(' DEFAULT ') + len(' DEFAULT ')
remainder = line[default_start:].strip()
# Handle different value types
if remainder.startswith("'"):
# String value with single quotes - could be '', or 'value', or JSON
# Find the matching closing quote
end_quote = remainder.index("'", 1)
setting_value = remainder[1:end_quote] # Extract content between quotes
elif ' ' in remainder:
# Value followed by other keywords (like NOT NULL)
setting_value = remainder.split()[0]
try:
setting_value = int(setting_value)
except ValueError:
pass
else:
# Simple value at end of line
setting_value = remainder
try:
setting_value = int(setting_value)
except ValueError:
pass
default_settings |= {setting_name:setting_value}
return default_settings
def ensure_settings_schema_match(self) -> None:
self.cur.execute("SELECT * FROM cwa_settings")
cwa_setting_names = [header[0] for header in self.cur.description]
# print(f"[cwa-db] DEBUG: Current available cwa_settings: {cwa_setting_names}")
# Add any settings present in the schema file but not in the db
newly_added_settings = []
for setting in self.cwa_default_settings.keys():
if setting not in cwa_setting_names:
success = self.add_missing_setting(setting)
if success:
print(f"[cwa-db] Setting '{setting}' successfully added to cwa.db!")
newly_added_settings.append(setting)
# Sync newly added settings with schema defaults
# This handles cases where schema default was updated after column was added
if newly_added_settings:
self.sync_new_settings_with_defaults(newly_added_settings)
# Fix for issue #903: Repair incorrectly parsed default values from older versions
self.fix_malformed_setting_values()
# Delete any settings in the db but not in the schema file
for setting in cwa_setting_names:
if setting not in self.cwa_default_settings.keys():
try:
print(f"[cwa-db] Deprecated setting found from previous version of CWA, removing setting '{setting}' from cwa.db...")
self.cur.execute(f"ALTER TABLE cwa_settings DROP COLUMN {setting}")
self.con.commit()
print(f"[cwa-db] Deprecated setting '{setting}' successfully removed from cwa.db!")
except Exception as e:
print(f"[cwa-db] The following error occurred when trying to remove {setting} from cwa.db:\n{e}")
def sync_new_settings_with_defaults(self, newly_added_settings) -> None:
"""Sync newly added settings to match schema defaults
This ensures that if a column was added with one default value, then the schema
was updated with a different default, existing databases get the new default.
"""
try:
# Get current values
self.cur.execute("SELECT * FROM cwa_settings")
headers = [header[0] for header in self.cur.description]
current_values = dict(zip(headers, self.cur.fetchone()))
# Update any newly added settings that don't match schema defaults
updates_made = []
for setting in newly_added_settings:
current_val = current_values.get(setting)
expected_val = self.cwa_default_settings.get(setting)
# Compare with type handling (int vs string)
if str(current_val) != str(expected_val):
self.cur.execute(f"UPDATE cwa_settings SET {setting}=?", (expected_val,))
updates_made.append(f"{setting}: {current_val} -> {expected_val}")
if updates_made:
self.con.commit()
print(f"[cwa-db] Synced {len(updates_made)} new setting(s) with schema defaults:")
for update in updates_made:
print(f"[cwa-db] - {update}")
except Exception as e:
print(f"[cwa-db] Warning: Failed to sync new settings with defaults: {e}")
def fix_malformed_setting_values(self) -> None:
"""Fix settings that may have been incorrectly parsed in older versions.
Issue #903: Old parser would save '' as literal two-quote string and truncate JSON.
This migration fixes existing databases with malformed values.
"""
try:
self.cur.execute("SELECT duplicate_scan_cron, duplicate_format_priority FROM cwa_settings")
row = self.cur.fetchone()
if not row:
return
cron_value, format_priority = row
fixes_made = []
# Fix duplicate_scan_cron if it's the literal string "''"
if cron_value == "''":
self.cur.execute("UPDATE cwa_settings SET duplicate_scan_cron = ''")
fixes_made.append("duplicate_scan_cron: removed literal quotes")
# Fix duplicate_format_priority if it's malformed (not valid JSON or missing formats)
if format_priority:
try:
import json
parsed = json.loads(format_priority)
# Check if it has at least the basic formats
if not isinstance(parsed, dict) or 'EPUB' not in parsed:
raise ValueError("Missing expected format data")
except (json.JSONDecodeError, ValueError):
# Reset to default if malformed
default_json = self.cwa_default_settings.get('duplicate_format_priority', '{}')
self.cur.execute("UPDATE cwa_settings SET duplicate_format_priority = ?", (default_json,))
fixes_made.append("duplicate_format_priority: reset to default due to malformed JSON")
if fixes_made:
self.con.commit()
print(f"[cwa-db] Fixed {len(fixes_made)} malformed setting value(s) from previous version:")
for fix in fixes_made:
print(f"[cwa-db] - {fix}")
except Exception as e:
print(f"[cwa-db] Warning: Failed to fix malformed setting values: {e}")
def add_missing_setting(self, setting) -> bool:
for line in self.schema:
match = re.findall(setting, line)
if match:
try:
command = line.replace('\n', '').strip()
# Skip SQL comments
if command.startswith('--') or not command:
continue
command = command.replace(',', ';')
with open('/config/.cwa_db_debug', 'a') as f:
f.write(command)
self.cur.execute(f"ALTER TABLE cwa_settings ADD {command}")
self.con.commit()
return True
except Exception as e:
print(f"[cwa-db] The following error occurred when trying to add {setting} to cwa.db:\n{e}")
return False
print(f"[cwa-db] Error adding new setting to cwa.db: {setting}: Matching setting could not be found in schema file")
return False
def match_stat_table_columns_with_schema(self) -> None:
""" Used to rename columns whose names have been changed in later versions and add columns added in later versions """
# Produces a dict with all of the column names for each table, from the existing DB
current_column_names = {}
for table in self.stats_tables:
try:
self.cur.execute(f"SELECT * FROM {table}")
setting_names = [header[0] for header in self.cur.description]
current_column_names |= {table:setting_names}
except sqlite3.OperationalError:
# Table might not exist yet if it's new, skip it for now
# It will be created by make_tables() if it doesn't exist
current_column_names |= {table: []}
# Produces a dict with all of the column names for each table, from the schema
column_names_in_schema = {}
for table in self.tables:
column_names = []
table_name = None # Reset for each table
table = table.split('\n')
for line in table:
if line[:27] == "CREATE TABLE IF NOT EXISTS ":
table_name = line[27:].replace('(', '').strip()
elif line[:4] == " ":
column_names.append(line.strip().split(' ')[0])
if table_name is not None: # Only add if table_name was actually found
column_names_in_schema[table_name] = column_names
for table in self.stats_tables:
# Skip if table wasn't found in current DB (it was just created empty)
if not current_column_names[table]:
continue
# Skip if table not found in schema (shouldn't happen but safety check)
if table not in column_names_in_schema:
print(f"[cwa-db] Warning: Table '{table}' in stats_tables but not found in schema")
continue
columns_added = False # Track if we added any columns this iteration
if len(current_column_names[table]) < len(column_names_in_schema[table]): # Adds new columns not yet in existing db
num_new_columns = len(column_names_in_schema[table]) - len(current_column_names[table])
for x in range(1, num_new_columns + 1):
if column_names_in_schema[table][-x] not in current_column_names[table]:
for line in self.schema:
matches = re.findall(column_names_in_schema[table][-x], line)
if matches:
# Extract column definition, remove trailing comma and SQL comments
new_column = line.strip()
if '--' in new_column:
new_column = new_column[:new_column.index('--')].strip()
new_column = new_column.rstrip(',')
self.cur.execute(f"ALTER TABLE {table} ADD COLUMN {new_column}")
self.con.commit()
print(f'[cwa-db] Missing Column detected in cwa.db. Added new column "{column_names_in_schema[table][-x]}" to table "{table}" in cwa.db')
columns_added = True
break # Found and added the column, move to next missing column
# Only check for column renames if we didn't just add columns
# (newly added columns are correct, don't try to rename them)
if not columns_added and len(current_column_names[table]) == len(column_names_in_schema[table]):
# Check if all columns exist but just in different order (SQLite ADD COLUMN always appends)
current_set = set(current_column_names[table])
schema_set = set(column_names_in_schema[table])
if current_set == schema_set:
# All columns exist, just in different order - this is fine, SQLite can't reorder
continue
# Columns differ, check for actual renames needed
for x in range(len(column_names_in_schema[table])):
if current_column_names[table][x] != column_names_in_schema[table][x]:
self.cur.execute(f"ALTER TABLE {table} RENAME COLUMN {current_column_names[table][x]} TO {column_names_in_schema[table][x]}")
self.con.commit()
print(f'[cwa-db] Fixed column mismatch between versions. Column "{current_column_names[table][x]}" in table "{table}" renamed to "{column_names_in_schema[table][x]}"', flush=True)
def set_default_settings(self, force=False) -> None:
"""Sets default settings for new tables and keeps track if the user is using the default settings or not.\n\n
If the argument 'force' is set to True, the function instead sets all settings to their default values"""
if force:
for setting in self.cwa_default_settings:
self.cur.execute(f"UPDATE cwa_settings SET {setting}=?;", (self.cwa_default_settings[setting],))
self.con.commit()
print("[cwa-db] CWA Default Settings successfully applied!")
return
try:
self.cur.execute("SELECT * FROM cwa_settings")
setting_names = [header[0] for header in self.cur.description]
current_settings = [dict(zip(setting_names,row)) for row in self.cur.fetchall()][0]
except IndexError:
print("[cwa-db]: No existing CWA settings detected, applying default CWA settings...")
for setting in self.cwa_default_settings:
self.cur.execute(f"UPDATE cwa_settings SET {setting}=?;", (self.cwa_default_settings[setting],))
self.con.commit()
print("[cwa-db] CWA Default Settings successfully applied!")
return
default_check = True
for setting in setting_names:
if setting == "default_settings":
continue
elif current_settings[setting] != self.cwa_default_settings[setting]:
default_check = False
self.cur.execute("UPDATE cwa_settings SET default_settings=0 WHERE default_settings=1;")
self.con.commit()
break
if default_check:
self.cur.execute("UPDATE cwa_settings SET default_settings=1 WHERE default_settings=0;")
self.con.commit()
if self.verbose:
print("[cwa-db] CWA Settings loaded successfully")
def get_cwa_settings(self) -> dict:
"""Gets the current cwa_settings values from the table of the same name in cwa.db and returns them as a dict"""
self.cur.execute("SELECT * FROM cwa_settings")
if self.cur.fetchall() == []: # If settings table is empty, populates it with default values
self.cur.execute("INSERT INTO cwa_settings DEFAULT VALUES;")
self.con.commit()
self.cur.execute("SELECT * FROM cwa_settings")
headers = [header[0] for header in self.cur.description]
cwa_settings = [dict(zip(headers,row)) for row in self.cur.fetchall()][0]
# Define default values for new columns (in case db doesn't have them yet)
schema_defaults = {
'hardcover_auto_fetch_enabled': 0,
'hardcover_auto_fetch_schedule': 'weekly',
'hardcover_auto_fetch_schedule_day': 'sunday',
'hardcover_auto_fetch_schedule_hour': 2,
'hardcover_auto_fetch_min_confidence': 0.85,
'hardcover_auto_fetch_batch_size': 50,
'hardcover_auto_fetch_rate_limit': 5.0,
'archived_cleanup_enabled': 1,
'archived_cleanup_schedule': 'daily',
'archived_cleanup_schedule_day': 'sunday',
'archived_cleanup_schedule_hour': 3,
'ingest_stale_temp_minutes': 120,
'ingest_stale_temp_interval': 600
}
# Apply defaults for missing keys
for key, default_value in schema_defaults.items():
if key not in cwa_settings:
cwa_settings[key] = default_value
# Define which settings should remain as integers (not converted to boolean)
integer_settings = ['ingest_timeout_minutes', 'ingest_stale_temp_minutes', 'ingest_stale_temp_interval', 'auto_send_delay_minutes', 'hardcover_auto_fetch_batch_size', 'hardcover_auto_fetch_schedule_hour', 'duplicate_scan_hour', 'duplicate_scan_chunk_size', 'duplicate_scan_debounce_seconds', 'archived_cleanup_schedule_hour']
# Define which settings should remain as floats (not converted to boolean)
float_settings = ['hardcover_auto_fetch_min_confidence', 'hardcover_auto_fetch_rate_limit']
# Define which settings should remain as JSON strings (not split by comma)
json_settings = ['metadata_provider_hierarchy', 'metadata_providers_enabled', 'duplicate_format_priority']
for header in headers:
if isinstance(cwa_settings[header], int) and header not in integer_settings and header not in float_settings:
cwa_settings[header] = bool(cwa_settings[header])
elif isinstance(cwa_settings[header], str) and ',' in cwa_settings[header] and header not in json_settings:
cwa_settings[header] = cwa_settings[header].split(',')
return cwa_settings
def update_cwa_settings(self, result) -> None:
"""Sets settings using POST request from set_cwa_settings()"""
for setting in result.keys():
if setting == "auto_convert_ignored_formats" or setting == "auto_ingest_ignored_formats" or setting == "auto_convert_retained_formats":
result[setting] = ','.join(result[setting])
# Skip updates for unset values to avoid NOT NULL constraint failures
if result[setting] is None:
continue
try:
# Use parameterized queries to safely handle non-English characters and quotes
self.cur.execute(f"UPDATE cwa_settings SET {setting}=?;", (result[setting],))
self.con.commit()
except Exception as e:
print(f"[CWA_DB] Error updating setting '{setting}' with value '{result[setting]}': {e}")
# Continue to next setting instead of failing completely
continue
self.set_default_settings()
def enforce_add_entry_from_log(self, log_info: dict, trigger_type: str = "auto -log"):
"""Adds an entry to the db from a change log file"""
self.cur.execute(
"INSERT INTO cwa_enforcement(timestamp, book_id, book_title, author, file_path, trigger_type) VALUES (?, ?, ?, ?, ?, ?);",
(log_info['timestamp'], log_info['book_id'], log_info['title'], log_info['authors'], log_info['file_path'], trigger_type)
)
self.con.commit()
def enforce_add_entry_from_dir(self, book_dicts: list[dict[str,str]]):
"""Adds an entry to the db when cover_enforcer is ran with a directory"""
for book in book_dicts:
self.cur.execute("INSERT INTO cwa_enforcement(timestamp, book_id, book_title, author, file_path, trigger_type) VALUES (?, ?, ?, ?, ?, ?);", (book['timestamp'], book['book_id'], book['book_title'], book['author_name'], book['file_path'], 'manual -dir'))
self.con.commit()
def enforce_add_entry_from_all(self, book_dicts: list[dict[str,str]]):
"""Adds an entry to the db when cover_enforcer is ran with the -all flag"""
for book in book_dicts:
self.cur.execute("INSERT INTO cwa_enforcement(timestamp, book_id, book_title, author, file_path, trigger_type) VALUES (?, ?, ?, ?, ?, ?);", (book['timestamp'], book['book_id'], book['book_title'], book['author_name'], book['file_path'], 'manual -all'))
self.con.commit()
def enforce_show(self, paths: bool, verbose: bool, web_ui=False):
results_no_path = self.cur.execute("SELECT timestamp, book_id, book_title, author, trigger_type FROM cwa_enforcement ORDER BY timestamp DESC;").fetchall()
results_with_path = self.cur.execute("SELECT timestamp, book_id, file_path FROM cwa_enforcement ORDER BY timestamp DESC;").fetchall()
if paths:
results = results_with_path
headers = ["Timestamp", "Book ID", "Book Title", "Book Author", "Trigger Type"]
else:
results = results_no_path
headers = ["Timestamp","Book ID", "Filepath"]
if verbose:
results.reverse()
if web_ui:
return results
else:
print(f"\n{tabulate(results, headers=headers, tablefmt='rounded_grid')}\n")
else:
newest_ten = []
x = 0
for result in results:
newest_ten.insert(0, result)
x += 1
if x == 10:
break
if web_ui:
return newest_ten
else:
print(f"\n{tabulate(newest_ten, headers=headers, tablefmt='rounded_grid')}\n")
def get_import_history(self, verbose: bool):
results = self.cur.execute("SELECT timestamp, filename, original_backed_up FROM cwa_import ORDER BY timestamp DESC;").fetchall()
if verbose:
results.reverse()
return results
else:
newest_ten = []
x = 0
for result in results:
newest_ten.insert(0, result)
x += 1
if x == 10:
break
return newest_ten
def get_conversion_history(self, verbose: bool):
results = self.cur.execute("SELECT timestamp, filename, original_format, end_format, original_backed_up FROM cwa_conversions ORDER BY timestamp DESC;").fetchall()
if verbose:
results.reverse()
return results
else:
newest_ten = []
x = 0
for result in results:
newest_ten.insert(0, result)
x += 1
if x == 10:
break
return newest_ten
def get_epub_fixer_history(self, fixes:bool, verbose: bool):
results_no_fixes = self.cur.execute("SELECT timestamp, filename, manually_triggered, num_of_fixes_applied, original_backed_up FROM epub_fixes ORDER BY timestamp DESC;").fetchall()
results_with_fixes = self.cur.execute("SELECT timestamp, filename, file_path, fixes_applied FROM epub_fixes ORDER BY timestamp DESC;").fetchall()
if fixes:
results = results_with_fixes
else:
results = results_no_fixes
if verbose:
results.reverse()
return results
else:
newest_ten = []
x = 0
for result in results:
newest_ten.insert(0, result)
x += 1
if x == 10:
break
return newest_ten
def import_add_entry(self, filename, original_backed_up):
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.cur.execute("INSERT INTO cwa_import(timestamp, filename, original_backed_up) VALUES (?, ?, ?);", (timestamp, filename, original_backed_up))
self.con.commit()
def conversion_add_entry(self, filename, original_format, end_format, original_backed_up): # TODO Add end_format - 22.11.2024 - Done?
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.cur.execute("INSERT INTO cwa_conversions(timestamp, filename, original_format, end_format, original_backed_up) VALUES (?, ?, ?, ?, ?);", (timestamp, filename, original_format, end_format, original_backed_up))
self.con.commit()
def epub_fixer_add_entry(self, filename, manually_triggered, num_of_fixes_applied, original_backed_up, file_path, fixes_applied=""):
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.cur.execute("INSERT INTO epub_fixes(timestamp, filename, manually_triggered, num_of_fixes_applied, original_backed_up, file_path, fixes_applied) VALUES (?, ?, ?, ?, ?, ?, ?);", (timestamp, filename, manually_triggered, num_of_fixes_applied, original_backed_up, file_path, fixes_applied))
self.con.commit()
def get_stat_totals(self) -> dict[str,int]:
totals = {"cwa_enforcement":0,
"cwa_conversions":0,
"epub_fixes":0}
for table in totals:
try:
totals[table] = self.cur.execute(f"SELECT count(*) FROM {table}").fetchone()[0]
except Exception as e:
print(f"[cwa-db] ERROR - The following error occurred when fetching stat totals:\n{e}")
return totals
# ==============================
# Scheduled Jobs (Auto-Send)
# ==============================
def ensure_scheduled_jobs_schema(self) -> None:
"""Add missing columns to cwa_scheduled_jobs if older table exists."""
try:
cols = [r[1] for r in self.cur.execute("PRAGMA table_info('cwa_scheduled_jobs')").fetchall()]
if cols:
if 'scheduler_job_id' not in cols:
self.cur.execute("ALTER TABLE cwa_scheduled_jobs ADD COLUMN scheduler_job_id TEXT DEFAULT ''")
self.con.commit()
except Exception:
# If table doesn't exist yet, it will be created from schema
pass
def scheduled_add_autosend(self, book_id: int, user_id: int, run_at_utc_iso: str, username: str, title: str) -> int | None:
"""Insert a scheduled auto-send job and return its row id."""
try:
created_at = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
self.cur.execute(
"""
INSERT INTO cwa_scheduled_jobs(job_type, book_id, user_id, username, title, run_at_utc, created_at_utc, state)
VALUES(?,?,?,?,?,?,?, 'scheduled')
""",
('auto_send', int(book_id), int(user_id), username, title, run_at_utc_iso, created_at)
)
self.con.commit()
return self.cur.lastrowid
except Exception as e:
print(f"[cwa-db] ERROR adding scheduled auto-send: {e}")
return None
def scheduled_add_job(self, job_type: str, run_at_utc_iso: str, username: str = 'System', title: str = '') -> int | None:
"""Insert a scheduled generic job (e.g., convert_library, epub_fixer) and return row id."""
try:
created_at = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
self.cur.execute(
"""
INSERT INTO cwa_scheduled_jobs(job_type, book_id, user_id, username, title, run_at_utc, created_at_utc, state)
VALUES(?,?,?,?,?,?,?, 'scheduled')
""",
(str(job_type), None, None, username, title, run_at_utc_iso, created_at)
)
self.con.commit()
return self.cur.lastrowid
except Exception as e:
print(f"[cwa-db] ERROR adding scheduled job '{job_type}': {e}")
return None
def scheduled_mark_dispatched(self, row_id: int) -> bool:
try:
# Only transition scheduled -> dispatched; ignore if already cancelled/dispatched
self.cur.execute("UPDATE cwa_scheduled_jobs SET state='dispatched' WHERE id=? AND state='scheduled'", (int(row_id),))
self.con.commit()
return self.cur.rowcount > 0
except Exception as e:
print(f"[cwa-db] ERROR marking scheduled job dispatched: {e}")
return False
def scheduled_mark_cancelled(self, row_id: int) -> None:
try:
self.cur.execute("UPDATE cwa_scheduled_jobs SET state='cancelled' WHERE id=?", (int(row_id),))
self.con.commit()
except Exception as e:
print(f"[cwa-db] ERROR marking scheduled job cancelled: {e}")
def scheduled_cancel_for_book(self, book_id: int) -> int:
"""Cancel all scheduled jobs (auto-send, etc.) for a specific book
Args:
book_id: The book ID whose scheduled jobs should be cancelled
Returns:
int: Number of jobs cancelled
"""
try:
self.cur.execute(
"UPDATE cwa_scheduled_jobs SET state='cancelled' WHERE book_id=? AND state='scheduled'",
(int(book_id),)
)
self.con.commit()
cancelled_count = self.cur.rowcount
if cancelled_count > 0:
print(f"[cwa-db] Cancelled {cancelled_count} scheduled job(s) for book {book_id}", flush=True)
return cancelled_count
except Exception as e:
print(f"[cwa-db] ERROR cancelling scheduled jobs for book {book_id}: {e}", flush=True)
return 0
def scheduled_update_job_id(self, row_id: int, scheduler_job_id: str) -> None:
try:
self.cur.execute("UPDATE cwa_scheduled_jobs SET scheduler_job_id=? WHERE id=?", (scheduler_job_id, int(row_id)))
self.con.commit()
except Exception as e:
print(f"[cwa-db] ERROR updating scheduler_job_id: {e}")
def scheduled_get_by_id(self, row_id: int):
try:
row = self.cur.execute("SELECT * FROM cwa_scheduled_jobs WHERE id=?", (int(row_id),)).fetchone()
if not row:
return None
cols = [d[0] for d in self.cur.description]
return dict(zip(cols, row))
except Exception as e:
print(f"[cwa-db] ERROR fetching scheduled job by id: {e}")
return None
def scheduled_get_upcoming_autosend(self, limit: int = 50):
try:
now_utc = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
rows = self.cur.execute(
"""
SELECT id, book_id, user_id, username, title, run_at_utc, state
FROM cwa_scheduled_jobs
WHERE job_type='auto_send' AND state='scheduled' AND run_at_utc >= ?
ORDER BY run_at_utc ASC
LIMIT ?
""",
(now_utc, int(limit))
).fetchall()
cols = [d[0] for d in self.cur.description]
return [dict(zip(cols, r)) for r in rows]
except Exception as e:
print(f"[cwa-db] ERROR fetching upcoming scheduled auto-sends: {e}")
return []
def scheduled_get_upcoming_by_type(self, job_type: str, limit: int = 50):
try:
now_utc = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
rows = self.cur.execute(
"""
SELECT id, job_type, book_id, user_id, username, title, run_at_utc, state
FROM cwa_scheduled_jobs
WHERE job_type=? AND state='scheduled' AND run_at_utc >= ?
ORDER BY run_at_utc ASC
LIMIT ?
""",
(str(job_type), now_utc, int(limit))
).fetchall()
cols = [d[0] for d in self.cur.description]
return [dict(zip(cols, r)) for r in rows]
except Exception as e:
print(f"[cwa-db] ERROR fetching upcoming scheduled jobs for {job_type}: {e}")
return []
def scheduled_get_pending_autosend(self):
"""Return all not-yet-dispatched auto-sends due in the future (for rehydration)."""
try:
now_utc = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
rows = self.cur.execute(
"""
SELECT id, book_id, user_id, username, title, run_at_utc
FROM cwa_scheduled_jobs
WHERE job_type='auto_send' AND state='scheduled' AND run_at_utc >= ?
ORDER BY run_at_utc ASC
""",
(now_utc,)
).fetchall()
cols = [d[0] for d in self.cur.description]
return [dict(zip(cols, r)) for r in rows]
except Exception as e:
print(f"[cwa-db] ERROR fetching pending scheduled auto-sends: {e}")
return []
def scheduled_get_pending_by_type(self, job_type: str):
"""Return all not-yet-dispatched jobs of given type due in the future (for rehydration)."""
try:
now_utc = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
rows = self.cur.execute(
"""
SELECT id, job_type, book_id, user_id, username, title, run_at_utc
FROM cwa_scheduled_jobs
WHERE job_type=? AND state='scheduled' AND run_at_utc >= ?
ORDER BY run_at_utc ASC
""",
(str(job_type), now_utc)
).fetchall()
cols = [d[0] for d in self.cur.description]
return [dict(zip(cols, r)) for r in rows]
except Exception as e:
print(f"[cwa-db] ERROR fetching pending scheduled jobs for {job_type}: {e}")
return []
def log_activity(self, user_id, user_name, event_type, item_id=None, item_title=None, extra_data=None):
"""Logs a user activity event to the database with device detection."""
try:
import json
# Parse extra_data if it's a string
if isinstance(extra_data, str):
try:
extra_data_dict = json.loads(extra_data)
except:
# If not JSON, treat as simple string (legacy format compatibility)
extra_data_dict = {'format': extra_data}
elif isinstance(extra_data, dict):
extra_data_dict = extra_data
else:
extra_data_dict = {}
# Add device type detection using User-Agent
try:
from flask import request
user_agent = request.headers.get('User-Agent', '').lower()
# Simple device type detection
if 'mobile' in user_agent or 'android' in user_agent or 'iphone' in user_agent:
device_type = 'mobile'
elif 'tablet' in user_agent or 'ipad' in user_agent:
device_type = 'tablet'
else:
device_type = 'desktop'
extra_data_dict['device_type'] = device_type
except:
# If flask context not available, skip device detection
pass
# Convert back to JSON string
extra_data_json = json.dumps(extra_data_dict) if extra_data_dict else None
self.cur.execute("""
INSERT INTO cwa_user_activity (user_id, user_name, event_type, item_id, item_title, extra_data)
VALUES (?, ?, ?, ?, ?, ?)
""", (user_id, user_name, event_type, item_id, item_title, extra_data_json))
self.con.commit()
except Exception as e:
print(f"[cwa-db] Error logging activity: {e}")
def get_active_users(self):
"""Returns list of distinct users who have activity logged."""
try:
self.cur.execute("""
SELECT DISTINCT user_id, COALESCE(user_name, 'Unknown User') as user_name
FROM cwa_user_activity
WHERE user_id IS NOT NULL
ORDER BY user_name ASC
""")
return self.cur.fetchall()
except Exception as e:
print(f"[cwa-db] Error fetching active users: {e}")
return []
def get_hourly_activity_heatmap(self, days=None, start_date=None, end_date=None, user_id=None):
"""Returns activity count by hour of day and day of week for heatmap visualization.
Returns list of tuples: (day_of_week, hour, count)
day_of_week: 0=Sunday, 1=Monday, ..., 6=Saturday
hour: 0-23
"""
try:
# Build date filter
if start_date and end_date:
date_filter = f"timestamp BETWEEN date('{start_date}') AND date('{end_date}', '+1 day')"
else:
days = days or 30
date_filter = f"timestamp >= date('now', '-{days} days')"
# Add user filter if provided
user_filter = self._build_user_filter(user_id)
combined_filter = date_filter + user_filter
self.cur.execute(f"""
SELECT
CAST(strftime('%w', timestamp) AS INTEGER) as day_of_week,
CAST(strftime('%H', timestamp) AS INTEGER) as hour,
COUNT(*) as activity_count
FROM cwa_user_activity
WHERE {combined_filter}
GROUP BY day_of_week, hour
ORDER BY day_of_week, hour
""")
return self.cur.fetchall()
except Exception as e:
print(f"[cwa-db] Error getting hourly activity heatmap: {e}")
return []
def get_reading_velocity(self, days=None, start_date=None, end_date=None, user_id=None):
"""Returns books read per week for velocity trend chart.
Returns list of tuples: (week_start_date, books_read_count)
"""
try:
# Build date filter
if start_date and end_date:
date_filter = f"timestamp BETWEEN date('{start_date}') AND date('{end_date}', '+1 day')"
else:
days = days or 90 # Default to 90 days for velocity trends
date_filter = f"timestamp >= date('now', '-{days} days')"
# Add user filter if provided
user_filter = self._build_user_filter(user_id)
combined_filter = date_filter + user_filter
self.cur.execute(f"""
SELECT
date(timestamp, 'weekday 0', '-6 days') as week_start,
COUNT(DISTINCT item_id) as books_read
FROM cwa_user_activity
WHERE event_type = 'READ'
AND {combined_filter}
GROUP BY week_start
ORDER BY week_start
""")
return self.cur.fetchall()
except Exception as e:
print(f"[cwa-db] Error getting reading velocity: {e}")
return []
def get_format_preferences(self, days=None, start_date=None, end_date=None, user_id=None):
"""Returns format preferences by user for stacked bar chart.
Returns list of tuples: (user_name, format, count)
"""
try:
# Build date filter
if start_date and end_date:
date_filter = f"timestamp BETWEEN date('{start_date}') AND date('{end_date}', '+1 day')"
else:
days = days or 30
date_filter = f"timestamp >= date('now', '-{days} days')"
# Add user filter if provided
user_filter = self._build_user_filter(user_id)
combined_filter = date_filter + user_filter
self.cur.execute(f"""
SELECT
COALESCE(user_name, 'Unknown User') as user_name,
UPPER(COALESCE(
CASE WHEN json_valid(extra_data)
THEN json_extract(extra_data, '$.format')
ELSE extra_data
END,
'UNKNOWN'
)) as format,
COUNT(*) as count
FROM cwa_user_activity
WHERE event_type IN ('DOWNLOAD', 'READ', 'EMAIL')
AND {combined_filter}
GROUP BY user_name, format
ORDER BY user_name, count DESC
""")
return self.cur.fetchall()
except Exception as e:
print(f"[cwa-db] Error getting format preferences: {e}")
return []
def get_discovery_sources(self, days=None, start_date=None, end_date=None, user_id=None):
"""Returns count of book discoveries grouped by source.
Returns list of tuples: (source, count)
"""
try:
# Build date filter
if start_date and end_date:
date_filter = f"timestamp BETWEEN date('{start_date}') AND date('{end_date}', '+1 day')"
else:
days = days or 30
date_filter = f"timestamp >= date('now', '-{days} days')"
# Add user filter if provided
user_filter = self._build_user_filter(user_id)
combined_filter = date_filter + user_filter
self.cur.execute(f"""
SELECT
COALESCE(
CASE WHEN json_valid(extra_data)
THEN json_extract(extra_data, '$.source')
ELSE NULL
END,
'direct'
) as source,
COUNT(*) as count
FROM cwa_user_activity
WHERE event_type IN ('READ', 'DOWNLOAD')
AND {combined_filter}
GROUP BY source
ORDER BY count DESC
""")
return self.cur.fetchall()
except Exception as e:
print(f"[cwa-db] Error getting discovery sources: {e}")
return []
def get_device_breakdown(self, days=None, start_date=None, end_date=None, user_id=None):
"""Returns activity count grouped by device type.
Returns list of tuples: (device_type, count)
"""
try:
# Build date filter
if start_date and end_date:
date_filter = f"timestamp BETWEEN date('{start_date}') AND date('{end_date}', '+1 day')"
else:
days = days or 30
date_filter = f"timestamp >= date('now', '-{days} days')"
# Add user filter if provided
user_filter = self._build_user_filter(user_id)
combined_filter = date_filter + user_filter
self.cur.execute(f"""
SELECT
COALESCE(
CASE WHEN json_valid(extra_data)
THEN json_extract(extra_data, '$.device_type')
ELSE NULL
END,
'unknown'
) as device_type,
COUNT(*) as count
FROM cwa_user_activity
WHERE {combined_filter}
GROUP BY device_type
ORDER BY count DESC
""")
return self.cur.fetchall()
except Exception as e:
print(f"[cwa-db] Error getting device breakdown: {e}")
return []
def get_failed_logins(self, days=None, start_date=None, end_date=None):
"""Returns failed login attempts with details.
Returns list of tuples: (ip, username_attempted, timestamp, count)
"""
try:
# Build date filter
if start_date and end_date:
date_filter = f"timestamp BETWEEN date('{start_date}') AND date('{end_date}', '+1 day')"
else:
days = days or 30
date_filter = f"timestamp >= date('now', '-{days} days')"
self.cur.execute(f"""
SELECT
json_extract(extra_data, '$.ip') as ip_address,
json_extract(extra_data, '$.username_attempted') as username,
MAX(timestamp) as last_attempt,
COUNT(*) as attempt_count
FROM cwa_user_activity
WHERE event_type = 'LOGIN_FAILED'
AND {date_filter}
GROUP BY ip_address, username
ORDER BY attempt_count DESC, last_attempt DESC
LIMIT 20
""")
return self.cur.fetchall()
except Exception as e:
print(f"[cwa-db] Error getting failed logins: {e}")
return []
def get_library_growth(self, days=None, start_date=None, end_date=None):
"""Returns books added per day from Calibre metadata.db for library growth timeline.
Returns list of tuples: (date, books_added_count)
"""
try:
import sqlite3
# Connect to Calibre's metadata.db
metadata_db_path = "/calibre-library/metadata.db"
metadata_con = sqlite3.connect(metadata_db_path, timeout=10)
metadata_cur = metadata_con.cursor()
# Build date filter
if start_date and end_date:
date_filter = f"timestamp BETWEEN date('{start_date}') AND date('{end_date}', '+1 day')"
else:
days = days or 365 # Default to 1 year for growth chart
date_filter = f"timestamp >= date('now', '-{days} days')"
metadata_cur.execute(f"""
SELECT
date(timestamp) as add_date,
COUNT(*) as books_added
FROM books
WHERE timestamp IS NOT NULL
AND {date_filter}
GROUP BY add_date
ORDER BY add_date ASC
""")
result = metadata_cur.fetchall()
metadata_con.close()
return result
except Exception as e:
print(f"[cwa-db] Error getting library growth: {e}")
return []
def get_books_added_count(self, days=None, start_date=None, end_date=None):
"""Returns total books added in time period with trend comparison.
Returns dict with: total, trend
"""
try:
import sqlite3
# Connect to Calibre's metadata.db
metadata_db_path = "/calibre-library/metadata.db"
metadata_con = sqlite3.connect(metadata_db_path, timeout=10)
metadata_cur = metadata_con.cursor()
# Build date filter for current period
if start_date and end_date:
date_filter = f"timestamp BETWEEN date('{start_date}') AND date('{end_date}', '+1 day')"
elif days:
date_filter = f"timestamp >= date('now', '-{days} days')"
else:
date_filter = "1=1" # All time - no filter
# Get current period count
metadata_cur.execute(f"""
SELECT COUNT(*) as total
FROM books
WHERE timestamp IS NOT NULL
AND {date_filter}
""")
current = metadata_cur.fetchone()
# Get previous period for trend comparison
if start_date and end_date:
# Calculate previous period of same duration
from datetime import datetime, timedelta
start_dt = datetime.strptime(start_date, '%Y-%m-%d')
end_dt = datetime.strptime(end_date, '%Y-%m-%d')
duration = (end_dt - start_dt).days
prev_start = (start_dt - timedelta(days=duration)).strftime('%Y-%m-%d')
prev_end = start_date
prev_filter = f"timestamp BETWEEN date('{prev_start}') AND date('{prev_end}')"
elif days:
prev_filter = f"timestamp >= date('now', '-{days * 2} days') AND timestamp < date('now', '-{days} days')"
else:
# All time - no previous period comparison
prev_filter = "1=0" # Returns 0
metadata_cur.execute(f"""
SELECT COUNT(*) as total
FROM books
WHERE timestamp IS NOT NULL
AND {prev_filter}
""")
previous = metadata_cur.fetchone()
total = current[0] or 0
prev_total = previous[0] or 0
# Calculate trend based on volume change
if prev_total > 0:
trend = ((total - prev_total) / prev_total * 100)
else:
trend = 0
metadata_con.close()
return {
'total': total,
'trend': round(trend, 1)
}
except Exception as e:
print(f"[cwa-db] Error getting books added count: {e}")
import traceback
traceback.print_exc()
return {
'total': 0,
'trend': 0
}
def get_library_formats(self, days=None, start_date=None, end_date=None):
"""Returns format distribution from Calibre metadata.db.
Args:
days: Number of days back from now (optional)
start_date: Start date string 'YYYY-MM-DD' (optional)
end_date: End date string 'YYYY-MM-DD' (optional)
Returns list of tuples: (format, count)
"""
try:
import sqlite3
# Connect to Calibre's metadata.db
metadata_db_path = "/calibre-library/metadata.db"
metadata_con = sqlite3.connect(metadata_db_path, timeout=10)
metadata_cur = metadata_con.cursor()
# Build date filter
if start_date and end_date:
date_filter = f"WHERE books.timestamp BETWEEN date('{start_date}') AND date('{end_date}', '+1 day')"
elif days:
date_filter = f"WHERE books.timestamp >= date('now', '-{days} days')"
else:
date_filter = "" # No filter, show all time
metadata_cur.execute(f"""
SELECT
UPPER(data.format) as format,
COUNT(*) as count
FROM books
JOIN data ON books.id = data.book
{date_filter}
GROUP BY format
ORDER BY count DESC
""")
result = metadata_cur.fetchall()
metadata_con.close()
return result
except Exception as e:
print(f"[cwa-db] Error getting library formats: {e}")
import traceback
traceback.print_exc()
return []
def get_conversion_success_rate(self, days=None, start_date=None, end_date=None):
"""Returns conversion success statistics from cwa_conversions table.
Note: All entries in cwa_conversions are successful (failed conversions aren't logged)
Returns dict with: total, successful, failed, success_rate, trend
"""
try:
# Build date filter
if start_date and end_date:
date_filter = f"timestamp BETWEEN date('{start_date}') AND date('{end_date}', '+1 day')"
elif days:
date_filter = f"timestamp >= date('now', '-{days} days')"
else:
date_filter = "1=1" # All time - no filter
# Get current period stats - all logged conversions are successful
self.cur.execute(f"""
SELECT COUNT(*) as total
FROM cwa_conversions
WHERE {date_filter}
""")
current = self.cur.fetchone()
# Get previous period for trend comparison
if start_date and end_date:
# Calculate previous period of same duration
from datetime import datetime, timedelta
start_dt = datetime.strptime(start_date, '%Y-%m-%d')
end_dt = datetime.strptime(end_date, '%Y-%m-%d')
duration = (end_dt - start_dt).days
prev_start = (start_dt - timedelta(days=duration)).strftime('%Y-%m-%d')
prev_end = start_date
prev_filter = f"timestamp BETWEEN date('{prev_start}') AND date('{prev_end}')"
elif days:
prev_filter = f"timestamp >= date('now', '-{days * 2} days') AND timestamp < date('now', '-{days} days')"
else:
# All time - no previous period comparison
prev_filter = "1=0" # Returns 0
self.cur.execute(f"""
SELECT COUNT(*) as total
FROM cwa_conversions
WHERE {prev_filter}
""")
previous = self.cur.fetchone()
total = current[0] or 0
successful = total # All logged conversions are successful
failed = 0 # Failed conversions are not logged in cwa_conversions
success_rate = 100.0 if total > 0 else 0 # 100% of logged conversions succeeded
# Calculate trend based on volume change (not rate, since rate is always 100%)
prev_total = previous[0] or 0
if prev_total > 0:
trend = ((total - prev_total) / prev_total * 100)
else:
trend = 0
return {
'total': total,
'successful': successful,
'failed': failed,
'success_rate': round(success_rate, 1),
'trend': round(trend, 1) # Trend represents volume change
}
except Exception as e:
print(f"[cwa-db] Error getting conversion success rate: {e}")
import traceback
traceback.print_exc()
return {
'total': 0,
'successful': 0,
'failed': 0,
'success_rate': 0,
'trend': 0
}
def get_series_completion_stats(self, limit=10):
"""Returns largest series by book count from Calibre metadata.db.
Args:
limit: Number of series to return (default 10)
Returns: List of tuples: (series_name, book_count, highest_index)
"""
try:
import sqlite3
# Connect to Calibre's metadata.db
metadata_db_path = "/calibre-library/metadata.db"
metadata_con = sqlite3.connect(metadata_db_path, timeout=10)
metadata_cur = metadata_con.cursor()
# Query series with book counts and highest index, ordered by count
metadata_cur.execute(f"""
SELECT
s.name as series_name,
COUNT(DISTINCT bs.book) as book_count,
CAST(MAX(b.series_index) AS INTEGER) as highest_index
FROM series s
JOIN books_series_link bs ON s.id = bs.series
JOIN books b ON bs.book = b.id
GROUP BY s.id, s.name
ORDER BY book_count DESC, series_name ASC
LIMIT {limit}
""")
results = metadata_cur.fetchall()
metadata_con.close()
return results
except Exception as e:
print(f"[cwa-db] Error getting series completion stats: {e}")
import traceback
traceback.print_exc()
return []
def get_publication_year_distribution(self):
"""Returns distribution of books by publication year from Calibre metadata.db.
Returns: List of tuples: (year, count)
"""
try:
import sqlite3
# Connect to Calibre's metadata.db
metadata_db_path = "/calibre-library/metadata.db"
metadata_con = sqlite3.connect(metadata_db_path, timeout=10)
metadata_cur = metadata_con.cursor()
# Extract year from pubdate and count books
metadata_cur.execute("""
SELECT
CAST(strftime('%Y', pubdate) as INTEGER) as year,
COUNT(*) as count
FROM books
WHERE pubdate IS NOT NULL
AND pubdate != '0101-01-01 00:00:00+00:00'
AND pubdate != ''
GROUP BY year
HAVING year >= 1800 AND year <= 2030
ORDER BY year ASC
""")
results = metadata_cur.fetchall()
metadata_con.close()
return results
except Exception as e:
print(f"[cwa-db] Error getting publication year distribution: {e}")
import traceback
traceback.print_exc()
return []
def get_most_fixed_books(self, limit=10):
"""Returns books with most EPUB fixes applied from epub_fixes table.
Args:
limit: Number of books to return (default 10)
Returns: List of tuples: (filename, fix_count, fixes_applied, last_fixed, file_path)
"""
try:
self.cur.execute(f"""
SELECT
filename,
COUNT(*) as fix_count,
GROUP_CONCAT(num_of_fixes_applied, ', ') as total_fixes,
MAX(timestamp) as last_fixed,
file_path
FROM epub_fixes
GROUP BY filename
ORDER BY fix_count DESC, last_fixed DESC
LIMIT {limit}
""")
return self.cur.fetchall()
except Exception as e:
print(f"[cwa-db] Error getting most fixed books: {e}")
import traceback
traceback.print_exc()
return []
def get_session_duration_stats(self, days=None, start_date=None, end_date=None, user_id=None):
"""Returns session duration statistics by calculating time between LOGIN events.
Args:
days: Number of days back (optional)
start_date/end_date: Custom range 'YYYY-MM-DD' (takes precedence)
user_id: Filter by user (optional)
Returns: Dict with average_minutes, median_minutes, session_distribution
"""
try:
# Build date filter
if start_date and end_date:
date_filter = f"timestamp BETWEEN date('{start_date}') AND date('{end_date}', '+1 day')"
else:
days = days or 30
date_filter = f"timestamp >= date('now', '-{days} days')"
# Add user filter if provided
user_filter = self._build_user_filter(user_id)
combined_filter = date_filter + user_filter
# Get LOGIN events ordered by user and time
self.cur.execute(f"""
WITH ordered_logins AS (
SELECT
user_id,
user_name,
timestamp,
LEAD(timestamp) OVER (PARTITION BY user_id ORDER BY timestamp) as next_login,
julianday(LEAD(timestamp) OVER (PARTITION BY user_id ORDER BY timestamp)) -
julianday(timestamp) as duration_days
FROM cwa_user_activity
WHERE event_type = 'LOGIN' AND {combined_filter}
)
SELECT
ROUND(AVG(duration_days * 24 * 60), 1) as avg_minutes,
duration_days * 24 * 60 as session_minutes
FROM ordered_logins
WHERE next_login IS NOT NULL
AND duration_days < 1 -- Ignore sessions > 24 hours
""")
results = self.cur.fetchall()
if not results:
return {'average_minutes': 0, 'median_minutes': 0, 'distribution': []}
# Calculate average
avg_result = self.cur.execute(f"""
WITH ordered_logins AS (
SELECT
julianday(LEAD(timestamp) OVER (PARTITION BY user_id ORDER BY timestamp)) -
julianday(timestamp) as duration_days
FROM cwa_user_activity
WHERE event_type = 'LOGIN' AND {combined_filter}
)
SELECT ROUND(AVG(duration_days * 24 * 60), 1) as avg_minutes
FROM ordered_logins
WHERE duration_days IS NOT NULL AND duration_days < 1
""").fetchone()
avg_minutes = avg_result[0] if avg_result and avg_result[0] else 0
# Get distribution for histogram (5-minute buckets)
self.cur.execute(f"""
WITH ordered_logins AS (
SELECT
julianday(LEAD(timestamp) OVER (PARTITION BY user_id ORDER BY timestamp)) -
julianday(timestamp) as duration_days
FROM cwa_user_activity
WHERE event_type = 'LOGIN' AND {combined_filter}
)
SELECT
CAST((duration_days * 24 * 60) / 5 AS INTEGER) * 5 as bucket_start,
COUNT(*) as count
FROM ordered_logins
WHERE duration_days IS NOT NULL AND duration_days < 1
GROUP BY bucket_start
ORDER BY bucket_start
""")
distribution = self.cur.fetchall()
return {
'average_minutes': avg_minutes,
'distribution': distribution # List of (bucket_start_minutes, count)
}
except Exception as e:
print(f"[cwa-db] Error getting session duration stats: {e}")
import traceback
traceback.print_exc()
return {'average_minutes': 0, 'distribution': []}
def get_search_success_rate(self, days=None, start_date=None, end_date=None, user_id=None):
"""Returns search success rate (searches followed by DOWNLOAD/READ within 5 minutes).
Args:
days: Number of days back (optional)
start_date/end_date: Custom range 'YYYY-MM-DD' (takes precedence)
user_id: Filter by user (optional)
Returns: Dict with total_searches, successful_searches, success_rate, trend
"""
try:
# Build date filter
if start_date and end_date:
date_filter = f"timestamp BETWEEN date('{start_date}') AND date('{end_date}', '+1 day')"
date_filter_prev = None
else:
days = days or 30
date_filter = f"timestamp >= date('now', '-{days} days')"
date_filter_prev = f"timestamp >= date('now', '-{days * 2} days') AND timestamp < date('now', '-{days} days')"
# Add user filter if provided
user_filter = self._build_user_filter(user_id)
combined_filter = date_filter + user_filter
# Count total searches in period
total_searches = self.cur.execute(f"""
SELECT COUNT(*)
FROM cwa_user_activity
WHERE event_type = 'SEARCH' AND {combined_filter}
""").fetchone()[0]
# Count successful searches (followed by DOWNLOAD or READ within 5 minutes)
successful_searches = self.cur.execute(f"""
SELECT COUNT(DISTINCT s.id)
FROM cwa_user_activity s
WHERE s.event_type = 'SEARCH'
AND {combined_filter}
AND EXISTS (
SELECT 1 FROM cwa_user_activity a
WHERE a.user_id = s.user_id
AND a.event_type IN ('DOWNLOAD', 'READ')
AND a.timestamp BETWEEN s.timestamp AND datetime(s.timestamp, '+5 minutes')
)
""").fetchone()[0]
success_rate = (successful_searches / total_searches * 100) if total_searches > 0 else 0
# Calculate trend if we have previous period
trend = 0
if date_filter_prev:
combined_filter_prev = date_filter_prev + user_filter
total_prev = self.cur.execute(f"""
SELECT COUNT(*)
FROM cwa_user_activity
WHERE event_type = 'SEARCH' AND {combined_filter_prev}
""").fetchone()[0]
successful_prev = self.cur.execute(f"""
SELECT COUNT(DISTINCT s.id)
FROM cwa_user_activity s
WHERE s.event_type = 'SEARCH'
AND {combined_filter_prev}
AND EXISTS (
SELECT 1 FROM cwa_user_activity a
WHERE a.user_id = s.user_id
AND a.event_type IN ('DOWNLOAD', 'READ')
AND a.timestamp BETWEEN s.timestamp AND datetime(s.timestamp, '+5 minutes')
)
""").fetchone()[0]
success_rate_prev = (successful_prev / total_prev * 100) if total_prev > 0 else 0
trend = success_rate - success_rate_prev if success_rate_prev > 0 else 0
return {
'total_searches': total_searches,
'successful_searches': successful_searches,
'success_rate': round(success_rate, 1),
'trend': round(trend, 1)
}
except Exception as e:
print(f"[cwa-db] Error getting search success rate: {e}")
import traceback
traceback.print_exc()
return {
'total_searches': 0,
'successful_searches': 0,
'success_rate': 0,
'trend': 0
}
def get_shelf_activity_stats(self, days=None, start_date=None, end_date=None, user_id=None, limit=10):
"""Returns most active shelves by number of additions.
Args:
days: Number of days back (optional)
start_date/end_date: Custom range 'YYYY-MM-DD' (takes precedence)
user_id: Filter by user (optional)
limit: Number of shelves to return (default 10)
Returns: List of tuples: (shelf_name, add_count, remove_count, net_change)
"""
try:
import json
# Build date filter
if start_date and end_date:
date_filter = f"timestamp BETWEEN date('{start_date}') AND date('{end_date}', '+1 day')"
else:
days = days or 30
date_filter = f"timestamp >= date('now', '-{days} days')"
# Add user filter if provided
user_filter = self._build_user_filter(user_id)
combined_filter = date_filter + user_filter
# Get shelf activity (parse shelf_name from extra_data JSON)
self.cur.execute(f"""
SELECT
json_extract(extra_data, '$.shelf_name') as shelf_name,
SUM(CASE WHEN event_type = 'SHELF_ADD' THEN 1 ELSE 0 END) as add_count,
SUM(CASE WHEN event_type = 'SHELF_REMOVE' THEN 1 ELSE 0 END) as remove_count,
SUM(CASE
WHEN event_type = 'SHELF_ADD' THEN 1
WHEN event_type = 'SHELF_REMOVE' THEN -1
ELSE 0
END) as net_change,
SUM(CASE WHEN event_type = 'MAGIC_SHELF_VIEW' THEN 1 ELSE 0 END) as view_count,
json_extract(extra_data, '$.shelf_type') as shelf_type
FROM cwa_user_activity
WHERE event_type IN ('SHELF_ADD', 'SHELF_REMOVE', 'MAGIC_SHELF_VIEW')
AND {combined_filter}
AND json_extract(extra_data, '$.shelf_name') IS NOT NULL
GROUP BY shelf_name
ORDER BY (add_count + view_count) DESC, net_change DESC
LIMIT {limit}
""")
return self.cur.fetchall()
except Exception as e:
print(f"[cwa-db] Error getting shelf activity stats: {e}")
import traceback
traceback.print_exc()
return []
def get_api_usage_breakdown(self, days=None, start_date=None, end_date=None, user_id=None):
"""Returns API usage breakdown by category (Web, Kobo, OPDS, Email).
Args:
days: Number of days back (optional)
start_date/end_date: Custom range 'YYYY-MM-DD' (takes precedence)
user_id: Filter by user (optional)
Returns: List of tuples: (category, count)
"""
try:
# Build date filter
if start_date and end_date:
date_filter = f"timestamp BETWEEN date('{start_date}') AND date('{end_date}', '+1 day')"
else:
days = days or 30
date_filter = f"timestamp >= date('now', '-{days} days')"
# Add user filter if provided
user_filter = self._build_user_filter(user_id)
combined_filter = date_filter + user_filter
# Categorize events
self.cur.execute(f"""
SELECT
CASE
WHEN event_type = 'KOBO_SYNC' THEN 'Kobo Sync'
WHEN event_type = 'OPDS_ACCESS' THEN 'OPDS Feed'
WHEN event_type = 'EMAIL' THEN 'Email Delivery'
WHEN event_type IN ('DOWNLOAD', 'READ', 'SEARCH', 'LOGIN') THEN 'Web UI'
ELSE 'Other'
END as category,
COUNT(*) as count
FROM cwa_user_activity
WHERE {combined_filter}
GROUP BY category
ORDER BY count DESC
""")
return self.cur.fetchall()
except Exception as e:
print(f"[cwa-db] Error getting API usage breakdown: {e}")
import traceback
traceback.print_exc()
return []
def get_endpoint_frequency_grouped(self, days=None, start_date=None, end_date=None, user_id=None, limit=20):
"""Returns endpoint access frequency with grouping by category.
Args:
days: Number of days back (optional)
start_date/end_date: Custom range 'YYYY-MM-DD' (takes precedence)
user_id: Filter by user (optional)
limit: Number of endpoints to return (default 20)
Returns: List of tuples: (endpoint, category, count, last_accessed)
"""
try:
import json
# Build date filter
if start_date and end_date:
date_filter = f"timestamp BETWEEN date('{start_date}') AND date('{end_date}', '+1 day')"
else:
days = days or 30
date_filter = f"timestamp >= date('now', '-{days} days')"
# Add user filter if provided
user_filter = self._build_user_filter(user_id)
combined_filter = date_filter + user_filter
# Debug: Check what event types actually exist
debug_query = f"SELECT DISTINCT event_type FROM cwa_user_activity WHERE {combined_filter}"
print(f"[cwa-db] Checking event types with filter: {debug_query}")
self.cur.execute(debug_query)
event_types = self.cur.fetchall()
print(f"[cwa-db] Found event types: {event_types}")
# Debug: Check what events exist
query = f"""
SELECT
CASE
WHEN extra_data IS NOT NULL AND extra_data != ''
AND json_valid(extra_data) = 1
AND json_extract(extra_data, '$.endpoint') IS NOT NULL
THEN json_extract(extra_data, '$.endpoint')
ELSE event_type
END as endpoint,
CASE
WHEN event_type = 'KOBO_SYNC' THEN 'Kobo'
WHEN event_type = 'OPDS_ACCESS' THEN 'OPDS'
WHEN event_type = 'EMAIL' THEN 'Email'
WHEN event_type = 'DOWNLOAD' THEN 'Downloads'
WHEN event_type = 'READ' THEN 'Reading'
WHEN event_type = 'SEARCH' THEN 'Search'
WHEN event_type = 'LOGIN' THEN 'Authentication'
ELSE 'Other'
END as category,
COUNT(*) as access_count,
MAX(timestamp) as last_accessed
FROM cwa_user_activity
WHERE {combined_filter}
GROUP BY endpoint, category
HAVING COUNT(*) > 0
ORDER BY access_count DESC, last_accessed DESC
LIMIT {limit}
"""
print(f"[cwa-db] Endpoint frequency query: {query}")
self.cur.execute(query)
results = self.cur.fetchall()
print(f"[cwa-db] Endpoint frequency results: {results}")
return results
except Exception as e:
print(f"[cwa-db] Error getting endpoint frequency: {e}")
import traceback
traceback.print_exc()
return []
def get_api_timing_heatmap(self, days=None, start_date=None, end_date=None, user_id=None):
"""Returns API activity timing for heatmap (hour × day of week).
Args:
days: Number of days back (optional)
start_date/end_date: Custom range 'YYYY-MM-DD' (takes precedence)
user_id: Filter by user (optional)
Returns: List of tuples: (day_of_week, hour, count)
"""
try:
# Build date filter
if start_date and end_date:
date_filter = f"timestamp BETWEEN date('{start_date}') AND date('{end_date}', '+1 day')"
else:
days = days or 30
date_filter = f"timestamp >= date('now', '-{days} days')"
# Add user filter if provided
user_filter = self._build_user_filter(user_id)
combined_filter = date_filter + user_filter
# Get API activity by time (focus on API events)
self.cur.execute(f"""
SELECT
CAST(strftime('%w', timestamp) AS INTEGER) as day_of_week,
CAST(strftime('%H', timestamp) AS INTEGER) as hour,
COUNT(*) as api_count
FROM cwa_user_activity
WHERE event_type IN ('KOBO_SYNC', 'OPDS_ACCESS', 'EMAIL', 'DOWNLOAD')
AND {combined_filter}
GROUP BY day_of_week, hour
ORDER BY day_of_week, hour
""")
return self.cur.fetchall()
except Exception as e:
print(f"[cwa-db] Error getting API timing heatmap: {e}")
import traceback
traceback.print_exc()
return []
def get_rating_statistics(self, days=None, start_date=None, end_date=None):
"""Returns rating statistics from metadata.db.
Args:
days: Number of days back (optional) - filters books added in period
start_date/end_date: Custom range 'YYYY-MM-DD' (takes precedence)
Returns: Dict with:
- average_rating: float (0-5 scale)
- rating_distribution: [(stars, count), ...] sorted by stars descending
- unrated_percentage: float
- trend: float (percentage change from previous period)
"""
try:
import sqlite3
metadata_db_path = "/calibre-library/metadata.db"
metadata_con = sqlite3.connect(metadata_db_path, timeout=10)
metadata_cur = metadata_con.cursor()
# Build date filter for books added in time period
if start_date and end_date:
date_filter = f"WHERE b.timestamp BETWEEN date('{start_date}') AND date('{end_date}', '+1 day')"
# Calculate previous period for trend
from datetime import datetime, timedelta
start_dt = datetime.strptime(start_date, '%Y-%m-%d')
end_dt = datetime.strptime(end_date, '%Y-%m-%d')
period_days = (end_dt - start_dt).days
prev_start = (start_dt - timedelta(days=period_days)).strftime('%Y-%m-%d')
prev_end = start_date
prev_date_filter = f"WHERE b.timestamp BETWEEN date('{prev_start}') AND date('{prev_end}', '+1 day')"
elif days:
date_filter = f"WHERE b.timestamp >= date('now', '-{days} days')"
prev_date_filter = f"WHERE b.timestamp >= date('now', '-{days * 2} days') AND b.timestamp < date('now', '-{days} days')"
else:
date_filter = ""
prev_date_filter = ""
# Get average rating (convert 0-10 scale to 0-5)
avg_query = f"""
SELECT AVG(r.rating) / 2.0 as avg_rating
FROM books b
JOIN books_ratings_link brl ON b.id = brl.book
JOIN ratings r ON brl.rating = r.id
{date_filter.replace('WHERE', 'WHERE' if date_filter else '') if date_filter else ''}
{'AND' if date_filter else 'WHERE'} r.rating > 0
"""
metadata_cur.execute(avg_query)
avg_result = metadata_cur.fetchone()
average_rating = round(avg_result[0], 2) if avg_result and avg_result[0] else 0.0
# Get previous period average for trend
if prev_date_filter:
prev_avg_query = f"""
SELECT AVG(r.rating) / 2.0 as avg_rating
FROM books b
JOIN books_ratings_link brl ON b.id = brl.book
JOIN ratings r ON brl.rating = r.id
{prev_date_filter}
AND r.rating > 0
"""
metadata_cur.execute(prev_avg_query)
prev_avg_result = metadata_cur.fetchone()
prev_average = prev_avg_result[0] if prev_avg_result and prev_avg_result[0] else 0.0
if prev_average > 0:
trend = round(((average_rating - prev_average) / prev_average) * 100, 1)
else:
trend = 0.0
else:
trend = 0.0
# Get rating distribution (1-5 stars)
dist_query = f"""
SELECT
CAST(r.rating / 2 AS INTEGER) as stars,
COUNT(*) as count
FROM books b
JOIN books_ratings_link brl ON b.id = brl.book
JOIN ratings r ON brl.rating = r.id
{date_filter}
{'AND' if date_filter else 'WHERE'} r.rating > 0
GROUP BY stars
ORDER BY stars DESC
"""
metadata_cur.execute(dist_query)
rating_distribution = metadata_cur.fetchall()
# Get total books and unrated count
total_query = f"SELECT COUNT(*) FROM books b {date_filter}"
metadata_cur.execute(total_query)
total_books = metadata_cur.fetchone()[0]
unrated_query = f"""
SELECT COUNT(*)
FROM books b
LEFT JOIN books_ratings_link brl ON b.id = brl.book
LEFT JOIN ratings r ON brl.rating = r.id
{date_filter}
{'AND' if date_filter else 'WHERE'} (brl.rating IS NULL OR r.rating = 0)
"""
metadata_cur.execute(unrated_query)
unrated_books = metadata_cur.fetchone()[0]
unrated_percentage = round((unrated_books / total_books * 100), 1) if total_books > 0 else 0.0
metadata_con.close()
return {
'average_rating': average_rating,
'rating_distribution': rating_distribution,
'unrated_percentage': unrated_percentage,
'trend': trend,
'total_books': total_books,
'rated_books': total_books - unrated_books
}
except Exception as e:
print(f"[cwa-db] Error getting rating statistics: {e}")
import traceback
traceback.print_exc()
return {
'average_rating': 0.0,
'rating_distribution': [],
'unrated_percentage': 0.0,
'trend': 0.0,
'total_books': 0,
'rated_books': 0
}
def get_top_enforced_books(self, limit=10):
"""Returns top books by enforcement count (cross-database query).
Args:
limit: Number of top books to return (default 10, max 10000)
Returns: List of tuples: (book_id, title, enforcement_count, last_enforced)
"""
try:
import sqlite3
# Limit to reasonable size
limit = min(limit, 10000)
# Pass 1: Get enforcement counts from cwa.db
self.cur.execute(f"""
SELECT
book_id,
COUNT(DISTINCT file_path) as enforcement_count,
MAX(timestamp) as last_enforced
FROM cwa_enforcement
GROUP BY book_id
ORDER BY enforcement_count DESC
LIMIT {limit}
""")
enforcement_data = self.cur.fetchall()
if not enforcement_data:
return []
# Pass 2: Enrich with book titles from metadata.db
metadata_db_path = "/calibre-library/metadata.db"
metadata_con = sqlite3.connect(metadata_db_path, timeout=10)
metadata_cur = metadata_con.cursor()
results = []
for book_id, enforcement_count, last_enforced in enforcement_data:
try:
metadata_cur.execute("SELECT title FROM books WHERE id = ?", (book_id,))
title_result = metadata_cur.fetchone()
if title_result:
results.append((book_id, title_result[0], enforcement_count, last_enforced))
except Exception as e:
print(f"[cwa-db] Error getting title for book_id {book_id}: {e}")
# Include with placeholder title
results.append((book_id, f"Book #{book_id}", enforcement_count, last_enforced))
metadata_con.close()
return results
except Exception as e:
print(f"[cwa-db] Error getting top enforced books: {e}")
import traceback
traceback.print_exc()
return []
def get_import_source_flows(self, limit=15):
"""Returns format conversion flows for Sankey diagram.
Args:
limit: Number of top flows to return (default 15)
Returns: List of tuples: (source_format, target_format, count)
"""
try:
# Get conversion flows from cwa_conversions table
self.cur.execute(f"""
SELECT
UPPER(original_format) as source,
UPPER(end_format) as target,
COUNT(*) as value
FROM cwa_conversions
WHERE original_format != ''
AND original_format IS NOT NULL
AND end_format != ''
AND end_format IS NOT NULL
GROUP BY source, target
ORDER BY value DESC
LIMIT {limit}
""")
return self.cur.fetchall()
except Exception as e:
print(f"[cwa-db] Error getting import source flows: {e}")
import traceback
traceback.print_exc()
return []
def get_hourly_activity_heatmap(self, days=None, start_date=None, end_date=None, user_id=None):
"""Returns activity count by hour of day and day of week for heatmap visualization.
Returns list of tuples: (day_of_week, hour, count)
day_of_week: 0=Sunday, 1=Monday, ..., 6=Saturday
hour: 0-23
"""
try:
# Build date filter
if start_date and end_date:
date_filter = f"timestamp BETWEEN date('{start_date}') AND date('{end_date}', '+1 day')"
else:
days = days or 30
date_filter = f"timestamp >= date('now', '-{days} days')"
# Add user filter if provided
user_filter = self._build_user_filter(user_id)
combined_filter = date_filter + user_filter
self.cur.execute(f"""
SELECT
CAST(strftime('%w', timestamp) AS INTEGER) as day_of_week,
CAST(strftime('%H', timestamp) AS INTEGER) as hour,
COUNT(*) as activity_count
FROM cwa_user_activity
WHERE {combined_filter}
GROUP BY day_of_week, hour
ORDER BY day_of_week, hour
""")
return self.cur.fetchall()
except Exception as e:
print(f"[cwa-db] Error getting hourly activity heatmap: {e}")
return []
def get_reading_velocity(self, days=None, start_date=None, end_date=None, user_id=None):
"""Returns books read per week with data for moving average calculation.
Returns list of tuples: (week_label, books_read_count)
week_label format: 'YYYY-Www' (e.g., '2025-W01')
"""
try:
# Build date filter
if start_date and end_date:
date_filter = f"timestamp BETWEEN date('{start_date}') AND date('{end_date}', '+1 day')"
else:
days = days or 30
date_filter = f"timestamp >= date('now', '-{days} days')"
# Add user filter if provided
user_filter = self._build_user_filter(user_id)
combined_filter = date_filter + user_filter
self.cur.execute(f"""
SELECT
strftime('%Y-W%W', timestamp) as week,
COUNT(DISTINCT item_id) as books_read
FROM cwa_user_activity
WHERE event_type = 'READ'
AND {combined_filter}
GROUP BY week
ORDER BY week
""")
return self.cur.fetchall()
except Exception as e:
print(f"[cwa-db] Error getting reading velocity: {e}")
return []
def get_format_preferences(self, days=None, start_date=None, end_date=None, user_id=None):
"""Returns format usage by user for stacked bar chart.
Returns list of tuples: (user_name, format, count)
"""
try:
# Build date filter
if start_date and end_date:
date_filter = f"timestamp BETWEEN date('{start_date}') AND date('{end_date}', '+1 day')"
else:
days = days or 30
date_filter = f"timestamp >= date('now', '-{days} days')"
# Add user filter if provided
user_filter = self._build_user_filter(user_id)
combined_filter = date_filter + user_filter
self.cur.execute(f"""
SELECT
COALESCE(user_name, 'Unknown User') as user_name,
UPPER(COALESCE(
json_extract(extra_data, '$.format'),
'Unknown'
)) as format,
COUNT(*) as count
FROM cwa_user_activity
WHERE event_type IN ('DOWNLOAD', 'READ')
AND {combined_filter}
GROUP BY user_name, format
ORDER BY user_name, count DESC
""")
return self.cur.fetchall()
except Exception as e:
print(f"[cwa-db] Error getting format preferences: {e}")
return []
def get_dashboard_stats(self, days=None, start_date=None, end_date=None, user_id=None):
"""Returns comprehensive activity stats for the user dashboard.
Args:
days: Number of days back from now (legacy support)
start_date: Start date string 'YYYY-MM-DD' (takes precedence over days)
end_date: End date string 'YYYY-MM-DD' (takes precedence over days)
user_id: Filter stats for specific user ID (optional)
"""
try:
# Use date range if provided, otherwise fall back to days
if start_date and end_date:
date_filter = f"timestamp BETWEEN date('{start_date}') AND date('{end_date}', '+1 day')"
else:
days = days or 30 # Default to 30 days
date_filter = f"timestamp >= date('now', '-{days} days')"
# Add user filter if provided
user_filter = self._build_user_filter(user_id)
combined_filter = date_filter + user_filter
# 1. Activity timeline - Daily counts by event type
self.cur.execute(f"""
SELECT date(timestamp) as day, event_type, COUNT(*) as count
FROM cwa_user_activity
WHERE {combined_filter}
GROUP BY day, event_type
ORDER BY day ASC
""")
timeline_data = self.cur.fetchall()
# 2. Top active users or most active days (depending on user filter)
if self._has_user_filter(user_id):
# Show most active days for specific user
self.cur.execute(f"""
SELECT date(timestamp) as day, COUNT(*) as activity_count
FROM cwa_user_activity
WHERE {combined_filter}
GROUP BY day
ORDER BY activity_count DESC
LIMIT 10
""")
top_users = self.cur.fetchall()
else:
# Show top active users across all users
self.cur.execute(f"""
SELECT user_id, COALESCE(user_name, 'Unknown User') as user_name, COUNT(*) as activity_count
FROM cwa_user_activity
WHERE {combined_filter}
GROUP BY user_id, user_name
ORDER BY activity_count DESC
LIMIT 10
""")
top_users = self.cur.fetchall()
# 3. Most popular books (reads + downloads + emails combined)
self.cur.execute(f"""
SELECT item_title, item_id, COUNT(*) as hits
FROM cwa_user_activity
WHERE item_id IS NOT NULL
AND event_type IN ('DOWNLOAD', 'READ', 'EMAIL')
AND {combined_filter}
GROUP BY item_id, item_title
ORDER BY hits DESC
LIMIT 10
""")
top_books = self.cur.fetchall()
# 4. Recent search terms
self.cur.execute(f"""
SELECT extra_data as search_term, timestamp, user_name
FROM cwa_user_activity
WHERE event_type = 'SEARCH'
AND extra_data IS NOT NULL
AND {combined_filter}
ORDER BY timestamp DESC
LIMIT 15
""")
recent_searches = self.cur.fetchall()
# 5. Download format distribution
self.cur.execute(f"""
SELECT
UPPER(COALESCE(
CASE WHEN json_valid(extra_data)
THEN json_extract(extra_data, '$.format')
ELSE extra_data
END,
'UNKNOWN'
)) as format,
COUNT(*) as count
FROM cwa_user_activity
WHERE event_type IN ('DOWNLOAD', 'EMAIL')
AND extra_data IS NOT NULL
AND {combined_filter}
GROUP BY format
ORDER BY count DESC
""")
format_distribution = self.cur.fetchall()
# 6. Event type breakdown (LOGIN, DOWNLOAD, READ, SEARCH, EMAIL)
self.cur.execute(f"""
SELECT event_type, COUNT(*) as count
FROM cwa_user_activity
WHERE {combined_filter}
GROUP BY event_type
ORDER BY count DESC
""")
event_breakdown = self.cur.fetchall()
# 7. Total activity metrics
if self._has_user_filter(user_id):
# For single user, show total logins instead of active users
self.cur.execute(f"""
SELECT
COUNT(*) as total_events,
COUNT(CASE WHEN event_type = 'LOGIN' THEN 1 END) as total_logins,
COUNT(DISTINCT CASE WHEN event_type IN ('DOWNLOAD', 'EMAIL') THEN item_id END) as unique_downloads,
COUNT(DISTINCT CASE WHEN event_type = 'READ' THEN item_id END) as unique_reads,
0 as active_users,
COUNT(CASE WHEN event_type IN ('DOWNLOAD', 'EMAIL') THEN 1 END) as total_downloads,
COUNT(CASE WHEN event_type = 'READ' THEN 1 END) as total_reads,
COUNT(CASE WHEN event_type = 'SEARCH' THEN 1 END) as total_searches
FROM cwa_user_activity
WHERE {combined_filter}
""")
else:
# For all users, show active user count
self.cur.execute(f"""
SELECT
COUNT(*) as total_events,
COUNT(CASE WHEN event_type = 'LOGIN' THEN 1 END) as total_logins,
COUNT(DISTINCT CASE WHEN event_type IN ('DOWNLOAD', 'EMAIL') THEN item_id END) as unique_downloads,
COUNT(DISTINCT CASE WHEN event_type = 'READ' THEN item_id END) as unique_reads,
COUNT(DISTINCT user_id) as active_users,
COUNT(CASE WHEN event_type IN ('DOWNLOAD', 'EMAIL') THEN 1 END) as total_downloads,
COUNT(CASE WHEN event_type = 'READ' THEN 1 END) as total_reads,
COUNT(CASE WHEN event_type = 'SEARCH' THEN 1 END) as total_searches
FROM cwa_user_activity
WHERE {combined_filter}
""")
totals = self.cur.fetchone()
return {
"timeline": timeline_data or [],
"top_users": top_users or [],
"top_books": top_books or [],
"recent_searches": recent_searches or [],
"format_distribution": format_distribution or [],
"event_breakdown": event_breakdown or [],
"totals": {
"total_events": totals[0] if totals else 0,
"total_logins": totals[1] if totals else 0,
"unique_downloads": totals[2] if totals else 0,
"unique_reads": totals[3] if totals else 0,
"active_users": totals[4] if totals else 0,
"total_downloads": totals[5] if totals else 0,
"total_reads": totals[6] if totals else 0,
"total_searches": totals[7] if totals else 0,
}
}
except Exception as e:
print(f"[cwa-db] Error getting dashboard stats: {e}")
import traceback
traceback.print_exc()
return {
"timeline": [],
"top_users": [],
"top_books": [],
"recent_searches": [],
"format_distribution": [],
"event_breakdown": [],
"totals": {
"total_events": 0,
"active_users": 0,
"unique_downloads": 0,
"unique_reads": 0,
"total_logins": 0,
"total_downloads": 0,
"total_reads": 0,
"total_searches": 0,
}
}
def invalidate_duplicate_cache(self):
"""Mark duplicate cache as needing refresh"""
try:
self.cur.execute("""
UPDATE cwa_duplicate_cache
SET scan_pending = 1
WHERE id = 1
""")
self.con.commit()
return True
except Exception as e:
print(f"[cwa-db] Error invalidating duplicate cache: {e}")
return False
def get_duplicate_cache(self):
"""Get cached duplicate scan results"""
import json
try:
self.cur.execute("""
SELECT scan_timestamp, duplicate_groups_json, total_count, scan_pending, last_scanned_book_id
FROM cwa_duplicate_cache
WHERE id = 1
""")
row = self.cur.fetchone()
if row and row[1]: # Has cached data
return {
'scan_timestamp': row[0],
'duplicate_groups': json.loads(row[1]),
'total_count': row[2],
'scan_pending': bool(row[3]),
'last_scanned_book_id': row[4]
}
return None
except Exception as e:
print(f"[cwa-db] Error getting duplicate cache: {e}")
return None
def update_duplicate_cache(self, duplicate_groups, total_count, max_book_id=None):
"""Update duplicate cache with fresh scan results
Args:
duplicate_groups: List of duplicate group dictionaries
total_count: Total number of duplicate groups found
max_book_id: Maximum book ID in metadata.db (optional, for incremental scanning)
"""
import json
from datetime import datetime
try:
# Serialize duplicate groups to JSON (extract only serializable data)
serializable_groups = []
for group in duplicate_groups:
serializable_group = {
'title': group.get('title', ''),
'author': group.get('author', ''),
'count': group.get('count', 0),
'group_hash': group.get('group_hash', ''),
'book_ids': [book.id for book in group.get('books', [])]
}
serializable_groups.append(serializable_group)
groups_json = json.dumps(serializable_groups)
# Update cache with optional max_book_id for incremental scanning
if max_book_id is not None:
self.cur.execute("""
UPDATE cwa_duplicate_cache
SET scan_timestamp = ?,
duplicate_groups_json = ?,
total_count = ?,
scan_pending = 0,
last_scanned_book_id = ?
WHERE id = 1
""", (datetime.now().isoformat(), groups_json, total_count, max_book_id))
else:
self.cur.execute("""
UPDATE cwa_duplicate_cache
SET scan_timestamp = ?,
duplicate_groups_json = ?,
total_count = ?,
scan_pending = 0
WHERE id = 1
""", (datetime.now().isoformat(), groups_json, total_count))
self.con.commit()
return True
except Exception as e:
print(f"[cwa-db] Error updating duplicate cache: {e}")
return False
def log_duplicate_resolution(self, group_hash, group_title, group_author, kept_book_id,
deleted_book_ids, strategy, trigger_type, user_id=None, notes=None):
"""Log a duplicate resolution to audit table"""
import json
try:
self.cur.execute("""
INSERT INTO cwa_duplicate_resolutions
(group_hash, group_title, group_author, kept_book_id, deleted_book_ids,
strategy, trigger_type, user_id, notes)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (group_hash, group_title, group_author, kept_book_id,
json.dumps(deleted_book_ids), strategy, trigger_type, user_id, notes))
self.con.commit()
return True
except Exception as e:
print(f"[cwa-db] Error logging duplicate resolution: {e}")
return False
def get_resolution_history(self, limit=100):
"""Get recent resolution history"""
import json
try:
self.cur.execute("""
SELECT id, timestamp, group_hash, group_title, group_author,
kept_book_id, deleted_book_ids, strategy, trigger_type, user_id, notes
FROM cwa_duplicate_resolutions
ORDER BY timestamp DESC
LIMIT ?
""", (limit,))
results = []
for row in self.cur.fetchall():
results.append({
'id': row[0],
'timestamp': row[1],
'group_hash': row[2],
'group_title': row[3],
'group_author': row[4],
'kept_book_id': row[5],
'deleted_book_ids': json.loads(row[6]),
'strategy': row[7],
'trigger_type': row[8],
'user_id': row[9],
'notes': row[10]
})
return results
except Exception as e:
print(f"[cwa-db] Error getting resolution history: {e}")
return []
def main():
db = CWA_DB()
if __name__ == "__main__":
main()