Files
crocodilestick 95e59034e3 opds: add magic shelves to OPDS catalog + fix empty magic shelf feeds
Add /opds/magicshelfindex and /opds/magicshelf routes, include magic shelves in OPDS feed rendering
Fix OPDS magic shelf paging and cache behavior to avoid empty results
Update feed list rendering for magic shelf entries
opds: make root catalog per‑user with ordering + visibility
Replace static OPDS root with dynamic entry list
Store per‑user OPDS order/hidden entries in view_settings
Split Shelves vs Magic Shelves in the OPDS root
ui: add drag‑and‑drop OPDS ordering with per‑entry visibility toggles on /me
Drag/drop list modeled after Duplicate Format Priority Ranking
Toggle visibility per entry while preserving default fallbacks
2026-01-31 14:06:18 +01:00

2798 lines
129 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Calibre-Web Automated fork of Calibre-Web
# Copyright (C) 2018-2026 Calibre-Web contributors
# Copyright (C) 2024-2026 Calibre-Web Automated contributors
# SPDX-License-Identifier: GPL-3.0-or-later
# See CONTRIBUTORS for full list of authors.
import os
import json
import mimetypes
import chardet # dependency of requests
import copy
import importlib
import re
import zipfile
import xml.etree.ElementTree as ET
from flask import Blueprint, jsonify
from flask import request, redirect, send_from_directory, send_file, make_response, flash, abort, url_for, Response
from flask import session as flask_session
from flask_babel import gettext as _
from flask_babel import get_locale
from .cw_login import login_user, logout_user, current_user
from flask_limiter import RateLimitExceeded
from flask_limiter.util import get_remote_address
from sqlalchemy.exc import IntegrityError, InvalidRequestError, OperationalError
from sqlalchemy.sql.expression import text, func, false, not_, and_, or_
from sqlalchemy.orm.attributes import flag_modified
from sqlalchemy.sql.functions import coalesce
from werkzeug.datastructures import Headers
from werkzeug.security import generate_password_hash, check_password_hash
from . import constants, logger, isoLanguages, services, helper
from . import db, ub, config, app
from . import calibre_db, kobo_sync_status
from .search import render_search_results, render_adv_search_results
from .gdriveutils import getFileFromEbooksFolder, do_gdrive_download
from .helper import check_valid_domain, check_email, check_username, \
get_book_cover, get_series_cover_thumbnail, get_download_link, send_mail, generate_random_password, \
send_registration_mail, check_send_to_ereader, check_read_formats, tags_filters, reset_password, valid_email, \
edit_book_read_status, valid_password
from .pagination import Pagination
from .redirect import get_redirect_location
from .cw_babel import get_available_locale
from .usermanagement import login_required_if_no_ano
from .kobo_sync_status import remove_synced_book
from . import magic_shelf
from .render_template import render_title_template
from .kobo_sync_status import change_archived_books
from . import limiter
from .services.worker import WorkerThread
from .tasks_status import render_task_status
from .usermanagement import user_login_required
from .string_helper import strip_whitespaces
# CWA Imports
import sqlite3
import time
import time
import sys
sys.path.insert(1, '/app/calibre-web-automated/scripts/')
from cwa_db import CWA_DB
feature_support = {
'ldap': bool(services.ldap),
'goodreads': bool(services.goodreads_support),
'kobo': bool(services.kobo),
'hardcover' : bool(services.hardcover)
}
try:
from . import oauth_bb
# Import functions directly since they don't change
register_user_with_oauth = oauth_bb.register_user_with_oauth
logout_oauth_user = oauth_bb.logout_oauth_user
get_oauth_status = oauth_bb.get_oauth_status
feature_support['oauth'] = True
except ImportError:
feature_support['oauth'] = False
# Create a mock oauth_bb module for when OAuth is not available
class MockOAuth:
oauth_check = {}
oauthblueprints = []
@staticmethod
def register_user_with_oauth(*args, **kwargs):
return None
@staticmethod
def logout_oauth_user(*args, **kwargs):
return None
@staticmethod
def get_oauth_status(*args, **kwargs):
return None
oauth_bb = MockOAuth()
register_user_with_oauth = oauth_bb.register_user_with_oauth
logout_oauth_user = oauth_bb.logout_oauth_user
get_oauth_status = oauth_bb.get_oauth_status
from functools import wraps
try:
from natsort import natsorted as sort
except ImportError:
sort = sorted # Just use regular sort then, may cause issues with badly named pages in cbz/cbr files
sql_version = importlib.metadata.version("sqlalchemy")
sqlalchemy_version2 = ([int(x) for x in sql_version.split('.')] >= [2, 0, 0])
_start_time = time.time()
@app.after_request
def add_security_headers(resp):
default_src = ([host.strip() for host in config.config_trustedhosts.split(',') if host] +
["'self'", "'unsafe-inline'", "'unsafe-eval'"])
csp = "default-src " + ' '.join(default_src)
if request.endpoint == "web.read_book" and config.config_use_google_drive:
csp +=" blob: "
csp += "; font-src 'self' data:"
if request.endpoint == "web.read_book":
csp += " blob: "
csp += "; img-src 'self'"
if request.path.startswith("/author/") and config.config_use_goodreads:
csp += " images.gr-assets.com i.gr-assets.com s.gr-assets.com"
if request.endpoint == "admin.hardcover_review_matches":
csp += " https:"
csp += " data:"
if request.endpoint == "edit-book.show_edit_book" or config.config_use_google_drive:
csp += " *"
if request.endpoint == "web.read_book":
csp += " blob: ; style-src-elem 'self' blob: 'unsafe-inline'"
csp += "; object-src 'none';"
resp.headers['Content-Security-Policy'] = csp
resp.headers['X-Content-Type-Options'] = 'nosniff'
resp.headers['X-Frame-Options'] = 'SAMEORIGIN'
resp.headers['X-XSS-Protection'] = '1; mode=block'
resp.headers['Strict-Transport-Security'] = 'max-age=31536000';
return resp
web = Blueprint('web', __name__)
log = logger.create()
# ################################### Login logic and rights management ###############################################
def download_required(f):
@wraps(f)
def inner(*args, **kwargs):
if current_user.role_download():
return f(*args, **kwargs)
abort(403)
return inner
def viewer_required(f):
@wraps(f)
def inner(*args, **kwargs):
if current_user.role_viewer():
return f(*args, **kwargs)
abort(403)
return inner
# ################################### data provider functions #########################################################
@web.route("/ajax/emailstat")
@user_login_required
def get_email_status_json():
tasks = WorkerThread.get_instance().tasks
return jsonify(render_task_status(tasks))
@web.route("/ajax/bookmark/<int:book_id>/<book_format>", methods=['POST'])
@user_login_required
def set_bookmark(book_id, book_format):
bookmark_key = request.form["bookmark"]
ub.session.query(ub.Bookmark).filter(and_(ub.Bookmark.user_id == int(current_user.id),
ub.Bookmark.book_id == book_id,
ub.Bookmark.format == book_format)).delete()
if not bookmark_key:
ub.session_commit()
return "", 204
l_bookmark = ub.Bookmark(user_id=current_user.id,
book_id=book_id,
format=book_format,
bookmark_key=bookmark_key)
ub.session.merge(l_bookmark)
ub.session_commit("Bookmark for user {} in book {} created".format(current_user.id, book_id))
return "", 201
@web.route("/ajax/toggleread/<int:book_id>", methods=['POST'])
@user_login_required
def toggle_read(book_id):
message = edit_book_read_status(book_id)
if message:
return message, 400
else:
return message
@web.route("/ajax/togglearchived/<int:book_id>", methods=['POST'])
@user_login_required
def toggle_archived(book_id):
change_archived_books(book_id, message="Book {} archive bit toggled".format(book_id))
# Remove book from syncd books list to force resync (?)
remove_synced_book(book_id)
return ""
@web.route("/ajax/view", methods=["POST"])
@login_required_if_no_ano
def update_view():
to_save = request.get_json()
try:
for element in to_save:
for param in to_save[element]:
current_user.set_view_property(element, param, to_save[element][param])
except Exception as ex:
log.error("Could not save view_settings: %r %r: %e", request, to_save, ex)
return "Invalid request", 400
return "1", 200
'''
@web.route("/ajax/getcomic/<int:book_id>/<book_format>/<int:page>")
@user_login_required
def get_comic_book(book_id, book_format, page):
book = calibre_db.get_book(book_id)
if not book:
return "", 204
else:
for bookformat in book.data:
if bookformat.format.lower() == book_format.lower():
cbr_file = os.path.join(config.config_calibre_dir, book.path, bookformat.name) + "." + book_format
if book_format in ("cbr", "rar"):
if feature_support['rar'] == True:
rarfile.UNRAR_TOOL = config.config_rarfile_location
try:
rf = rarfile.RarFile(cbr_file)
names = sort(rf.namelist())
extract = lambda page: rf.read(names[page])
except:
# rarfile not valid
log.error('Unrar binary not found, or unable to decompress file %s', cbr_file)
return "", 204
else:
log.info('Unrar is not supported please install python rarfile extension')
# no support means return nothing
return "", 204
elif book_format in ("cbz", "zip"):
zf = zipfile.ZipFile(cbr_file)
names=sort(zf.namelist())
extract = lambda page: zf.read(names[page])
elif book_format in ("cbt", "tar"):
tf = tarfile.TarFile(cbr_file)
names=sort(tf.getnames())
extract = lambda page: tf.extractfile(names[page]).read()
else:
log.error('unsupported comic format')
return "", 204
b64 = codecs.encode(extract(page), 'base64').decode()
ext = names[page].rpartition('.')[-1]
if ext not in ('png', 'gif', 'jpg', 'jpeg', 'webp'):
ext = 'png'
extractedfile="data:image/" + ext + ";base64," + b64
fileData={"name": names[page], "page":page, "last":len(names)-1, "content": extractedfile}
return make_response(json.dumps(fileData))
return "", 204
'''
# ################################### Typeahead ##################################################################
@web.route("/get_authors_json", methods=['GET'])
@login_required_if_no_ano
def get_authors_json():
return calibre_db.get_typeahead(db.Authors, request.args.get('q'), ('|', ','))
@web.route("/get_publishers_json", methods=['GET'])
@login_required_if_no_ano
def get_publishers_json():
return calibre_db.get_typeahead(db.Publishers, request.args.get('q'), ('|', ','))
@web.route("/get_tags_json", methods=['GET'])
@login_required_if_no_ano
def get_tags_json():
return calibre_db.get_typeahead(db.Tags, request.args.get('q'), tag_filter=tags_filters())
@web.route("/get_series_json", methods=['GET'])
@login_required_if_no_ano
def get_series_json():
return calibre_db.get_typeahead(db.Series, request.args.get('q'))
@web.route("/get_languages_json", methods=['GET'])
@login_required_if_no_ano
def get_languages_json():
query = (request.args.get('q') or '').lower()
language_names = isoLanguages.get_language_names(get_locale())
entries_start = [s for key, s in language_names.items() if s.lower().startswith(query.lower())]
if len(entries_start) < 5:
entries = [s for key, s in language_names.items() if query in s.lower()]
entries_start.extend(entries[0:(5 - len(entries_start))])
entries_start = list(set(entries_start))
json_dumps = json.dumps([dict(name=r) for r in entries_start[0:5]])
return json_dumps
@web.route("/get_matching_tags", methods=['GET'])
@login_required_if_no_ano
def get_matching_tags():
tag_dict = {'tags': []}
q = calibre_db.session.query(db.Books).filter(calibre_db.common_filters(True))
calibre_db.create_functions()
# calibre_db.session.connection().connection.connection.create_function("lower", 1, db.lcase)
author_input = request.args.get('authors') or ''
title_input = request.args.get('title') or ''
include_tag_inputs = request.args.getlist('include_tag') or ''
exclude_tag_inputs = request.args.getlist('exclude_tag') or ''
q = q.filter(db.Books.authors.any(func.lower(db.Authors.name).ilike("%" + author_input + "%")),
func.lower(db.Books.title).ilike("%" + title_input + "%"))
if len(include_tag_inputs) > 0:
for tag in include_tag_inputs:
q = q.filter(db.Books.tags.any(db.Tags.id == tag))
if len(exclude_tag_inputs) > 0:
for tag in exclude_tag_inputs:
q = q.filter(not_(db.Books.tags.any(db.Tags.id == tag)))
for book in q:
for tag in book.tags:
if tag.id not in tag_dict['tags']:
tag_dict['tags'].append(tag.id)
json_dumps = json.dumps(tag_dict)
return json_dumps
def generate_char_list(entries): # data_colum, db_link):
char_list = list()
for entry in entries:
upper_char = entry[0].name[0].upper()
if upper_char not in char_list:
char_list.append(upper_char)
return char_list
def query_char_list(data_colum, db_link):
results = (calibre_db.session.query(func.upper(func.substr(data_colum, 1, 1)).label('char'))
.join(db_link).join(db.Books).filter(calibre_db.common_filters())
.group_by(func.upper(func.substr(data_colum, 1, 1))).all())
return results
def get_sort_function(sort_param, data):
order = [db.Books.timestamp.desc()]
if sort_param == 'stored':
sort_param = current_user.get_view_property(data, 'stored')
else:
current_user.set_view_property(data, 'stored', sort_param)
if sort_param == 'pubnew':
order = [db.Books.pubdate.desc()]
if sort_param == 'pubold':
order = [db.Books.pubdate]
if sort_param == 'abc':
order = [db.Books.sort]
if sort_param == 'zyx':
order = [db.Books.sort.desc()]
if sort_param == 'new':
order = [db.Books.timestamp.desc()]
if sort_param == 'old':
order = [db.Books.timestamp]
if sort_param == 'authaz':
order = [db.Books.author_sort.asc(), db.Series.name, db.Books.series_index]
if sort_param == 'authza':
order = [db.Books.author_sort.desc(), db.Series.name.desc(), db.Books.series_index.desc()]
if sort_param == 'seriesasc':
order = [db.Books.series_index.asc()]
if sort_param == 'seriesdesc':
order = [db.Books.series_index.desc()]
if sort_param == 'hotdesc':
order = [func.count(ub.Downloads.book_id).desc()]
if sort_param == 'hotasc':
order = [func.count(ub.Downloads.book_id).asc()]
if sort_param is None:
sort_param = "new"
return order, sort_param
def cwa_get_library_location() -> str:
dirs = {}
with open('/app/calibre-web-automated/dirs.json', 'r') as f:
dirs: dict[str, str] = json.load(f)
library_dir = dirs['calibre_library_dir']
return library_dir
def cwa_get_num_books_in_library() -> int:
try:
# Path to user's Calibre library's metadata.db
db_path = os.path.join(cwa_get_library_location(), "metadata.db")
# Connect to the SQLite database with simple retry for transient locks
retries, count = 3, 0
while retries:
try:
conn = sqlite3.connect(db_path, timeout=30)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM books")
count = cursor.fetchone()[0]
conn.close()
break
except sqlite3.OperationalError as e:
if 'locked' in str(e).lower() and retries > 1:
time.sleep(0.1)
retries -= 1
continue
raise
# Return the result
return count
except Exception:
return 0
def render_books_list(data, sort_param, book_id, page):
order = get_sort_function(sort_param, data)
if data == "rated":
return render_rated_books(page, book_id, order=order)
elif data == "discover":
return render_discover_books(book_id)
elif data == "unread":
return render_read_books(page, False, order=order)
elif data == "read":
return render_read_books(page, True, order=order)
elif data == "hot":
return render_hot_books(page, order)
elif data == "download":
return render_downloaded_books(page, order, book_id)
elif data == "author":
return render_author_books(page, book_id, order)
elif data == "publisher":
return render_publisher_books(page, book_id, order)
elif data == "series":
return render_series_books(page, book_id, order)
elif data == "ratings":
return render_ratings_books(page, book_id, order)
elif data == "formats":
return render_formats_books(page, book_id, order)
elif data == "category":
return render_category_books(page, book_id, order)
elif data == "language":
return render_language_books(page, book_id, order)
elif data == "archived":
return render_archived_books(page, order)
elif data == "search":
term = request.args.get('query', None)
offset = int(int(config.config_books_per_page) * (page - 1))
return render_search_results(term, offset, order, config.config_books_per_page)
elif data == "advsearch":
term = json.loads(flask_session.get('query', '{}'))
offset = int(int(config.config_books_per_page) * (page - 1))
return render_adv_search_results(term, offset, order, config.config_books_per_page)
elif data == "magicshelf":
return render_magic_shelf(book_id, sort_param, page)
else:
website = data or "newest"
entries, random, pagination = calibre_db.fill_indexpage(page, 0, db.Books, True, order[0],
True, config.config_read_column,
db.books_series_link,
db.Books.id == db.books_series_link.c.book,
db.Series)
try:
title = _(f'Books ({pagination.total_count})')
except:
title = _(f'Books ({cwa_get_num_books_in_library()})')
return render_title_template('index.html', random=random, entries=entries, pagination=pagination,
title=title, page=website, order=order[1])
def render_rated_books(page, book_id, order):
if current_user.check_visibility(constants.SIDEBAR_BEST_RATED):
entries, random, pagination = calibre_db.fill_indexpage(page, 0,
db.Books,
db.Books.ratings.any(db.Ratings.rating > 9),
order[0],
True, config.config_read_column,
db.books_series_link,
db.Books.id == db.books_series_link.c.book,
db.Series)
return render_title_template('index.html', random=random, entries=entries, pagination=pagination,
id=book_id, title=_("Top Rated Books"), page="rated", order=order[1])
else:
abort(404)
def render_discover_books(book_id):
if current_user.check_visibility(constants.SIDEBAR_RANDOM):
entries, __, ___ = calibre_db.fill_indexpage(1, 0, db.Books, True, [func.randomblob(2)],
join_archive_read=True,
config_read_column=config.config_read_column)
pagination = Pagination(1, config.config_books_per_page, config.config_books_per_page)
return render_title_template('index.html', random=false(), entries=entries, pagination=pagination, id=book_id,
title=_("Discover (Random Books)"), page="discover")
else:
abort(404)
def render_hot_books(page, order):
if current_user.check_visibility(constants.SIDEBAR_HOT):
if order[1] not in ['hotasc', 'hotdesc']:
order = [func.count(ub.Downloads.book_id).desc()], 'hotdesc'
random = false()
if current_user.show_detail_random():
random_query = calibre_db.generate_linked_query(config.config_read_column, db.Books)
random = (random_query.filter(calibre_db.common_filters())
.order_by(func.random())
.limit(config.config_random_books).all())
off = int(config.config_books_per_page) * (page - 1)
# Get total count for pagination
total_hot_books = ub.session.query(func.count(ub.Downloads.book_id.distinct())).scalar()
# Get the book_ids for the current page
hot_book_ids_query = (ub.session.query(ub.Downloads.book_id)
.group_by(ub.Downloads.book_id)
.order_by(*order[0])
.offset(off)
.limit(config.config_books_per_page))
hot_book_ids = [item[0] for item in hot_book_ids_query]
entries = []
if hot_book_ids:
query = calibre_db.generate_linked_query(config.config_read_column, db.Books)
# Fetch all book details in one query
book_details = query.filter(calibre_db.common_filters()).filter(db.Books.id.in_(hot_book_ids)).all()
# Create a dictionary for quick lookups
book_map = {book.Books.id: book for book in book_details}
# Reorder the entries to match the "hotness" order
for book_id in hot_book_ids:
if book_id in book_map:
entries.append(book_map[book_id])
else:
# This book might have been deleted from calibre but still in downloads table
ub.delete_download(book_id)
pagination = Pagination(page, config.config_books_per_page, total_hot_books)
return render_title_template('index.html', random=random, entries=entries, pagination=pagination,
title=_("Hot Books (Most Downloaded)"), page="hot", order=order[1])
else:
abort(404)
def render_downloaded_books(page, order, user_id):
if current_user.role_admin():
user_id = int(user_id)
else:
user_id = current_user.id
user = ub.session.query(ub.User).filter(ub.User.id == user_id).first()
if current_user.check_visibility(constants.SIDEBAR_DOWNLOAD) and user:
entries, random, pagination = calibre_db.fill_indexpage(page,
0,
db.Books,
ub.Downloads.user_id == user_id,
order[0],
True, config.config_read_column,
db.books_series_link,
db.Books.id == db.books_series_link.c.book,
db.Series,
ub.Downloads, db.Books.id == ub.Downloads.book_id)
for book in entries:
if not (calibre_db.session.query(db.Books).filter(calibre_db.common_filters())
.filter(db.Books.id == book.Books.id).first()):
ub.delete_download(book.Books.id)
return render_title_template('index.html',
random=random,
entries=entries,
pagination=pagination,
id=user_id,
title=_("Downloaded books by %(user)s", user=user.name),
page="download",
order=order[1])
else:
abort(404)
def render_author_books(page, author_id, order):
entries, __, pagination = calibre_db.fill_indexpage(page, 0,
db.Books,
db.Books.authors.any(db.Authors.id == author_id),
[order[0][0], db.Series.name, db.Books.series_index],
True, config.config_read_column,
db.books_series_link,
db.books_series_link.c.book == db.Books.id,
db.Series)
if entries is None or not len(entries):
flash(_("Oops! Selected book is unavailable. File does not exist or is not accessible"),
category="error")
return redirect(url_for("web.index"))
if sqlalchemy_version2:
author = calibre_db.session.get(db.Authors, author_id)
else:
author = calibre_db.session.query(db.Authors).get(author_id)
author_name = author.name.replace('|', ',')
author_info = None
other_books = []
if services.goodreads_support and config.config_use_goodreads:
author_info = services.goodreads_support.get_author_info(author_name)
book_entries = [entry.Books for entry in entries]
other_books = services.goodreads_support.get_other_books(author_info, book_entries)
return render_title_template('author.html', entries=entries, pagination=pagination, id=author_id,
title=_("Author: %(name)s", name=author_name), author=author_info,
other_books=other_books, page="author", order=order[1])
def render_publisher_books(page, book_id, order):
if book_id == '-1':
entries, random, pagination = calibre_db.fill_indexpage(page, 0,
db.Books,
db.Publishers.name == None,
[db.Series.name, order[0][0], db.Books.series_index],
True, config.config_read_column,
db.books_publishers_link,
db.Books.id == db.books_publishers_link.c.book,
db.Publishers,
db.books_series_link,
db.Books.id == db.books_series_link.c.book,
db.Series)
publisher = _("None")
else:
publisher = calibre_db.session.query(db.Publishers).filter(db.Publishers.id == book_id).first()
if publisher:
entries, random, pagination = calibre_db.fill_indexpage(page, 0,
db.Books,
db.Books.publishers.any(
db.Publishers.id == book_id),
[db.Series.name, order[0][0],
db.Books.series_index],
True, config.config_read_column,
db.books_series_link,
db.Books.id == db.books_series_link.c.book,
db.Series)
publisher = publisher.name
else:
abort(404)
return render_title_template('index.html', random=random, entries=entries, pagination=pagination, id=book_id,
title=_("Publisher: %(name)s", name=publisher),
page="publisher",
order=order[1])
def render_series_books(page, book_id, order):
if book_id == '-1':
entries, random, pagination = calibre_db.fill_indexpage(page, 0,
db.Books,
db.Series.name == None,
[order[0][0]],
True, config.config_read_column,
db.books_series_link,
db.Books.id == db.books_series_link.c.book,
db.Series)
series_name = _("None")
else:
series_name = calibre_db.session.query(db.Series).filter(db.Series.id == book_id).first()
if series_name:
entries, random, pagination = calibre_db.fill_indexpage(page, 0,
db.Books,
db.Books.series.any(db.Series.id == book_id),
[order[0][0]],
True, config.config_read_column)
series_name = series_name.name
else:
abort(404)
return render_title_template('index.html', random=random, pagination=pagination, entries=entries, id=book_id,
title=_("Series: %(serie)s", serie=series_name), page="series", order=order[1])
def render_ratings_books(page, book_id, order):
if book_id == '-1':
db_filter = coalesce(db.Ratings.rating, 0) < 1
entries, random, pagination = calibre_db.fill_indexpage(page, 0,
db.Books,
db_filter,
[order[0][0]],
True, config.config_read_column,
db.books_ratings_link,
db.Books.id == db.books_ratings_link.c.book,
db.Ratings)
title = _("Rating: None")
else:
name = calibre_db.session.query(db.Ratings).filter(db.Ratings.id == book_id).first()
if name:
entries, random, pagination = calibre_db.fill_indexpage(page, 0,
db.Books,
db.Books.ratings.any(db.Ratings.id == book_id),
[order[0][0]],
True, config.config_read_column)
title = _("Rating: %(rating)s stars", rating=int(name.rating / 2))
else:
abort(404)
return render_title_template('index.html', random=random, pagination=pagination, entries=entries, id=book_id,
title=title, page="ratings", order=order[1])
def render_formats_books(page, book_id, order):
if book_id == '-1':
name = _("None")
entries, random, pagination = calibre_db.fill_indexpage(page, 0,
db.Books,
db.Data.format == None,
[order[0][0]],
True, config.config_read_column,
db.Data)
else:
name = calibre_db.session.query(db.Data).filter(db.Data.format == book_id.upper()).first()
if name:
name = name.format
entries, random, pagination = calibre_db.fill_indexpage(page, 0,
db.Books,
db.Books.data.any(
db.Data.format == book_id.upper()),
[order[0][0]],
True, config.config_read_column)
else:
abort(404)
return render_title_template('index.html', random=random, pagination=pagination, entries=entries, id=book_id,
title=_("File format: %(format)s", format=name),
page="formats",
order=order[1])
def render_category_books(page, book_id, order):
if book_id == '-1':
entries, random, pagination = calibre_db.fill_indexpage(page, 0,
db.Books,
db.Tags.name == None,
[order[0][0], db.Series.name, db.Books.series_index],
True, config.config_read_column,
db.books_tags_link,
db.Books.id == db.books_tags_link.c.book,
db.Tags,
db.books_series_link,
db.Books.id == db.books_series_link.c.book,
db.Series)
tagsname = _("None")
else:
tagsname = calibre_db.session.query(db.Tags).filter(db.Tags.id == book_id).first()
if tagsname:
# Issue #906: Pass viewing_tag_id to allow this tag even if not in allowed tags
entries, random, pagination = calibre_db.fill_indexpage(page, 0,
db.Books,
db.Books.tags.any(db.Tags.id == book_id),
[order[0][0], db.Series.name,
db.Books.series_index],
True, config.config_read_column,
db.books_series_link,
db.Books.id == db.books_series_link.c.book,
db.Series,
viewing_tag_id=book_id)
tagsname = tagsname.name
else:
abort(404)
return render_title_template('index.html', random=random, entries=entries, pagination=pagination, id=book_id,
title=_("Category: %(name)s", name=tagsname), page="category", order=order[1])
def render_language_books(page, name, order):
try:
if name.lower() != "none":
lang_name = isoLanguages.get_language_name(get_locale(), name)
if lang_name == "Unknown":
abort(404)
else:
lang_name = _("None")
except KeyError:
abort(404)
if name == "none":
entries, random, pagination = calibre_db.fill_indexpage(page, 0,
db.Books,
db.Languages.lang_code == None,
[order[0][0]],
True, config.config_read_column,
db.books_languages_link,
db.Books.id == db.books_languages_link.c.book,
db.Languages)
else:
entries, random, pagination = calibre_db.fill_indexpage(page, 0,
db.Books,
db.Books.languages.any(db.Languages.lang_code == name),
[order[0][0]],
True, config.config_read_column)
return render_title_template('index.html', random=random, entries=entries, pagination=pagination, id=name,
title=_("Language: %(name)s", name=lang_name), page="language", order=order[1])
def render_read_books(page, are_read, as_xml=False, order=None):
sort_param = order[0] if order else []
if not config.config_read_column:
if are_read:
db_filter = and_(ub.ReadBook.user_id == int(current_user.id),
ub.ReadBook.read_status == ub.ReadBook.STATUS_FINISHED)
else:
db_filter = coalesce(ub.ReadBook.read_status, 0) != ub.ReadBook.STATUS_FINISHED
else:
try:
if are_read:
db_filter = db.cc_classes[config.config_read_column].value == True
else:
db_filter = coalesce(db.cc_classes[config.config_read_column].value, False) != True
except (KeyError, AttributeError, IndexError):
log.error("Custom Column No.{} does not exist in calibre database".format(config.config_read_column))
if not as_xml:
flash(_("Custom Column No.%(column)d does not exist in calibre database",
column=config.config_read_column),
category="error")
return redirect(url_for("web.index"))
return [] # ToDo: Handle error Case for opds
entries, random, pagination = calibre_db.fill_indexpage(page, 0,
db.Books,
db_filter,
sort_param,
True, config.config_read_column,
db.books_series_link,
db.Books.id == db.books_series_link.c.book,
db.Series)
if as_xml:
return entries, pagination
else:
if are_read:
name = _('Read Books') + ' (' + str(pagination.total_count) + ')'
page_name = "read"
else:
name = _('Unread Books') + ' (' + str(pagination.total_count) + ')'
page_name = "unread"
return render_title_template('index.html', random=random, entries=entries, pagination=pagination,
title=name, page=page_name, order=order[1])
def render_archived_books(page, sort_param):
order = sort_param[0] or []
archived_books = (ub.session.query(ub.ArchivedBook)
.filter(ub.ArchivedBook.user_id == int(current_user.id))
.filter(ub.ArchivedBook.is_archived == True)
.all())
archived_book_ids = [archived_book.book_id for archived_book in archived_books]
archived_filter = db.Books.id.in_(archived_book_ids)
entries, random, pagination = calibre_db.fill_indexpage_with_archived_books(page, db.Books,
0,
archived_filter,
order,
True,
True, config.config_read_column)
name = _('Archived Books') + ' (' + str(len(archived_book_ids)) + ')'
page_name = "archived"
return render_title_template('index.html', random=random, entries=entries, pagination=pagination,
title=name, page=page_name, order=sort_param[1])
@web.route("/magicshelf/<int:shelf_id>", defaults={"sort_param": "stored", 'page': 1})
@web.route("/magicshelf/<int:shelf_id>/<sort_param>", defaults={'page': 1})
@web.route("/magicshelf/<int:shelf_id>/<sort_param>/<int:page>")
@login_required_if_no_ano
def render_magic_shelf(shelf_id, sort_param, page):
"""Render a magic shelf with proper pagination and sorting."""
shelf = ub.session.query(ub.MagicShelf).get(shelf_id)
if not shelf:
log.warning(f"Magic shelf {shelf_id} not found")
abort(404)
# Check access - users can view their own shelves OR public shelves
if shelf.user_id != current_user.id and shelf.is_public != 1:
log.warning(f"User {current_user.id} attempted to access private magic shelf {shelf_id} owned by {shelf.user_id}")
abort(403)
# Get sort order using the same function as other book lists
order = get_sort_function(sort_param, "magicshelf")
# Get pagination settings\
per_page = config.config_books_per_page or 20
# Build sort order - order[0] is a list, we need to unpack it
sort_order = order[0] if order and len(order) > 0 else []
# Check for cache bypass
bypass_cache = request.args.get('refresh') == '1'
# Get books with pagination
try:
books, total_count = magic_shelf.get_books_for_magic_shelf(
shelf_id,
page=page,
page_size=per_page,
sort_order=sort_order,
sort_param=sort_param,
bypass_cache=bypass_cache
)
log.debug(f"Magic shelf {shelf_id} returned {len(books)} books out of {total_count} total")
# Log activity
try:
from scripts.cwa_db import CWA_DB
cwa_db = CWA_DB()
cwa_db.log_activity(
user_id=current_user.id,
user_name=current_user.name,
event_type='MAGIC_SHELF_VIEW',
item_id=shelf_id,
item_title=shelf.name,
extra_data=json.dumps({'shelf_name': shelf.name, 'shelf_type': 'magic'})
)
except Exception as e:
log.error(f"Failed to log magic shelf activity: {e}")
except Exception as e:
log.error(f"Error retrieving books for magic shelf {shelf_id}: {e}")
flash(_("Error loading magic shelf"), category="error")
return redirect(url_for('web.index'))
# Create proper pagination object
from .pagination import Pagination
pagination = Pagination(page, per_page, total_count)
# Wrap books in entry objects with .Books attribute for template compatibility
class Entry:
def __init__(self, book):
self.Books = book
entries = [Entry(book) for book in books]
# Check if this shelf is hidden by current user (for public shelves)
is_hidden = False
if shelf.user_id != current_user.id:
is_hidden = ub.session.query(ub.HiddenMagicShelfTemplate).filter(
ub.HiddenMagicShelfTemplate.user_id == current_user.id,
ub.HiddenMagicShelfTemplate.shelf_id == shelf_id
).first() is not None
return render_title_template('index.html',
entries=entries,
pagination=pagination,
title=_("Magic Shelf&nbsp&nbsp&nbsp—&nbsp&nbsp&nbsp%(icon)s %(name)s", icon=shelf.icon, name=shelf.name),
page="magicshelf",
shelf=shelf,
is_hidden_shelf=is_hidden,
id=shelf_id,
order=order[1])
# ################################### Health Check ##################################################################
@web.route("/health")
def health_check():
uptime = time.time() - _start_time
try:
db_path = os.path.join(cwa_get_library_location(), "metadata.db")
retries = 3
while retries:
try:
conn = sqlite3.connect(db_path, timeout=30)
cursor = conn.cursor()
cursor.execute("SELECT 1")
db_up = True
conn.close()
break
except sqlite3.OperationalError as e:
if 'locked' in str(e).lower() and retries > 1:
time.sleep(0.1)
retries -= 1
continue
raise
except Exception:
db_up = False
return jsonify({
"status": "ok" if db_up else "degraded",
"uptime": uptime,
"version": f"CWA/{constants.INSTALLED_VERSION}",
}), 200 if db_up else 503
# ################################### View Books list ##################################################################
@web.route("/", defaults={'page': 1})
@web.route('/page/<int:page>')
@login_required_if_no_ano
def index(page):
if current_user.is_authenticated and current_user.role_admin():
arch_warning = helper.check_architecture()
if arch_warning:
flash(arch_warning, category="cwa_arch_warning")
sort_param = (request.args.get('sort') or 'stored').lower()
return render_books_list("newest", sort_param, 1, page)
@web.route('/<data>/<sort_param>', defaults={'page': 1, 'book_id': 1})
@web.route('/<data>/<sort_param>/', defaults={'page': 1, 'book_id': 1})
@web.route('/<data>/<sort_param>/<book_id>', defaults={'page': 1})
@web.route('/<data>/<sort_param>/<book_id>/<int:page>')
@login_required_if_no_ano
def books_list(data, sort_param, book_id, page):
return render_books_list(data, sort_param, book_id, page)
@web.route("/magicshelf/preview", methods=["POST"])
@user_login_required
def preview_magic_shelf():
"""Preview what books match the given rules without saving the shelf."""
try:
data = request.get_json()
rules = data.get('rules')
if not rules or not rules.get('rules'):
return jsonify({"success": False, "message": _("No rules provided")}), 400
# Temporarily create a query to count matching books
try:
query_filter = magic_shelf.build_query_from_rules(rules, user_id=current_user.id)
if query_filter is None:
return jsonify({"success": False, "message": _("Invalid rules format")}), 400
cdb = db.CalibreDB(init=True)
query = cdb.session.query(db.Books)
query = query.filter(query_filter)
query = query.filter(cdb.common_filters())
# Get total count
total_count = query.count()
# Get sample books (first 5)
sample_books = query.limit(5).all()
sample_titles = [book.title for book in sample_books]
return jsonify({
"success": True,
"count": total_count,
"sample_books": sample_titles
})
except Exception as e:
log.error(f"Error previewing magic shelf rules: {str(e)}", exc_info=True)
return jsonify({"success": False, "message": _("Error processing rules")}), 500
except Exception as e:
log.error(f"Error in preview_magic_shelf: {e}")
return jsonify({"success": False, "message": _("Invalid request")}), 400
@web.route("/magicshelf", methods=["GET", "POST"])
@user_login_required
def create_magic_shelf():
# Curated emoji list for magic shelf icons - organized by category
ALLOWED_ICONS = [
# Books & Reading
'📚', '📖', '📕', '📗', '📘', '📙', '📔', '📓', '📒', '📰',
# Stars & Favorites
'', '🌟', '', '💫', '🌠', '', '🔥', '💥', '🎯', '🏆',
# Hearts
'❤️', '💙', '💚', '💛', '🧡', '💜', '🖤', '🤍', '💖', '💝',
# Entertainment
'🎭', '🎬', '🎪', '🎨', '🎮', '🎲', '🎰', '🎳', '🎱', '🎸',
# Travel & Space
'🚀', '🛸', '🌌', '🌍', '🌎', '🌏', '🗺️', '🧭', '⛰️', '🏔️',
# Fantasy & Magic
'🔮', '🎃', '👻', '🦄', '🐉', '🐲', '🧙', '🧚', '🧛', '🧜',
# Awards & Achievement
'🥇', '🥈', '🥉', '🏅', '🎖️', '👑', '💎', '💍', '🔱', '🎗️',
# Time & Organization
'', '⏱️', '', '', '🕰️', '🔔', '📅', '📆', '📌', '📍',
# Learning & Science
'🎓', '🏫', '📝', '✏️', '📐', '📏', '🔬', '🔭', '🖌️', '🖍️',
# Nature & Weather
'🌈', '☀️', '🌙', '🌸', '🌺', '🌻', '🌹', '🌷', '🍀', '🌱'
]
if request.method == "POST":
data = request.get_json()
name = strip_whitespaces(data.get('name', ''))
rules = data.get('rules')
icon = data.get('icon', '🪄')
kobo_sync = data.get('kobo_sync', False)
is_public = data.get('is_public', False)
# Only allow public if user has permission
if is_public and not current_user.role_edit_shelfs():
is_public = False
# Validate inputs
if not name or not rules:
return jsonify({"success": False, "message": _("Name and rules are required")}), 400
if len(name) > 100:
return jsonify({"success": False, "message": _("Shelf name too long (max 100 characters)")}), 400
# Use default icon if none provided, otherwise accept any emoji/symbol
if not icon:
icon = '🪄'
try:
new_shelf = ub.MagicShelf(
name=name,
user_id=current_user.id,
rules=rules,
icon=icon,
kobo_sync=kobo_sync,
is_public=1 if is_public else 0
)
ub.session.add(new_shelf)
ub.session_commit()
log.info(f"User {current_user.id} created magic shelf '{name}' (ID: {new_shelf.id})")
return jsonify({"success": True, "shelf_id": new_shelf.id})
except Exception as e:
log.error(f"Error creating magic shelf: {e}")
ub.session.rollback()
return jsonify({"success": False, "message": _("Error creating shelf")}), 500
# For GET request, render the creation form
# Fetch available languages for the dropdown
languages = calibre_db.session.query(db.Languages).all()
language_map = {}
for lang in languages:
try:
lang_name = isoLanguages.get_language_name(get_locale(), lang.lang_code)
language_map[lang.lang_code] = lang_name
except:
language_map[lang.lang_code] = lang.lang_code
return render_title_template('magic_shelf_edit.html',
title=_("Create Magic Shelf"),
page="magic_shelf_create",
allowed_icons=ALLOWED_ICONS,
languages=language_map)
@web.route("/magicshelf/<int:shelf_id>/edit", methods=["GET", "POST"])
@user_login_required
def edit_magic_shelf(shelf_id):
# Curated emoji list for magic shelf icons - organized by category
ALLOWED_ICONS = [
# Books & Reading
'📚', '📖', '📕', '📗', '📘', '📙', '📔', '📓', '📒', '📰',
# Stars & Favorites
'', '🌟', '', '💫', '🌠', '', '🔥', '💥', '🎯', '🏆',
# Hearts
'❤️', '💙', '💚', '💛', '🧡', '💜', '🖤', '🤍', '💖', '💝',
# Entertainment
'🎭', '🎬', '🎪', '🎨', '🎮', '🎲', '🎰', '🎳', '🎱', '🎸',
# Travel & Space
'🚀', '🛸', '🌌', '🌍', '🌎', '🌏', '🗺️', '🧭', '⛰️', '🏔️',
# Fantasy & Magic
'🔮', '🎃', '👻', '🦄', '🐉', '🐲', '🧙', '🧚', '🧛', '🧜',
# Awards & Achievement
'🥇', '🥈', '🥉', '🏅', '🎖️', '👑', '💎', '💍', '🔱', '🎗️',
# Time & Organization
'', '⏱️', '', '', '🕰️', '🔔', '📅', '📆', '📌', '📍',
# Learning & Science
'🎓', '🏫', '📝', '✏️', '📐', '📏', '🔬', '🔭', '🖌️', '🖍️',
# Nature & Weather
'🌈', '☀️', '🌙', '🌸', '🌺', '🌻', '🌹', '🌷', '🍀', '🌱'
]
shelf = ub.session.query(ub.MagicShelf).get(shelf_id)
if not shelf:
log.warning(f"Magic shelf {shelf_id} not found")
abort(404)
# Check if user can edit this shelf (owner or admin only)
if shelf.user_id != current_user.id and not current_user.role_admin():
log.warning(f"User {current_user.id} attempted to edit magic shelf {shelf_id} without permission")
abort(403)
if request.method == "POST":
data = request.get_json()
name = strip_whitespaces(data.get('name', shelf.name))
rules = data.get('rules', shelf.rules)
icon = data.get('icon', shelf.icon)
kobo_sync = data.get('kobo_sync', shelf.kobo_sync)
is_public = data.get('is_public', shelf.is_public == 1)
# Only allow changing public status if user has permission
if is_public != (shelf.is_public == 1):
if not current_user.role_edit_shelfs():
return jsonify({"success": False, "message": _("Permission denied to change public status")}), 403
# Validate inputs
if not name:
return jsonify({"success": False, "message": _("Shelf name is required")}), 400
if len(name) > 100:
return jsonify({"success": False, "message": _("Shelf name too long (max 100 characters)")}), 400
# Use default icon if none provided, otherwise accept any emoji/symbol
if not icon:
icon = '🪄'
try:
shelf.name = name
shelf.rules = rules
shelf.icon = icon
shelf.kobo_sync = kobo_sync
shelf.is_public = 1 if is_public else 0
flag_modified(shelf, "rules")
# Invalidate Complex Query Cache
ub.session.query(ub.MagicShelfCache).filter_by(shelf_id=shelf.id).delete()
ub.session_commit()
# Invalidate cache
if 'magic_shelf_counts' in flask_session:
counts = flask_session['magic_shelf_counts']
if str(shelf_id) in counts:
del counts[str(shelf_id)]
flask_session.modified = True
log.info(f"User {current_user.id} updated magic shelf {shelf_id} ('{name}') with icon '{icon}'")
return jsonify({"success": True})
except Exception as e:
log.error(f"Error updating magic shelf {shelf_id}: {e}")
ub.session.rollback()
return jsonify({"success": False, "message": _("Error updating shelf")}), 500
# For GET request, render the edit form
# Fetch available languages for the dropdown
languages = calibre_db.session.query(db.Languages).all()
language_map = {}
for lang in languages:
try:
lang_name = isoLanguages.get_language_name(get_locale(), lang.lang_code)
language_map[lang.lang_code] = lang_name
except:
language_map[lang.lang_code] = lang.lang_code
return render_title_template('magic_shelf_edit.html',
shelf=shelf,
title=_("Edit Magic Shelf"),
page="magic_shelf_edit",
allowed_icons=ALLOWED_ICONS,
languages=language_map)
@web.route("/magicshelf/<int:shelf_id>/duplicate", methods=["POST"])
@user_login_required
def duplicate_magic_shelf(shelf_id):
"""Duplicate an existing magic shelf (especially useful for system templates)."""
shelf = ub.session.query(ub.MagicShelf).get(shelf_id)
if not shelf:
log.warning(f"Magic shelf {shelf_id} not found for duplication")
return jsonify({"success": False, "message": _("Shelf not found")}), 404
# Users can duplicate their own shelves or any public shelf
if shelf.user_id != current_user.id and not shelf.is_public:
log.warning(f"User {current_user.id} attempted to duplicate private shelf {shelf_id} owned by {shelf.user_id}")
return jsonify({"success": False, "message": _("Permission denied")}), 403
try:
# Create duplicate with " (Copy)" suffix
new_name = f"{shelf.name} (Copy)"
# If name already exists, add number
counter = 1
while ub.session.query(ub.MagicShelf).filter(
ub.MagicShelf.user_id == current_user.id,
ub.MagicShelf.name == new_name
).first():
counter += 1
new_name = f"{shelf.name} (Copy {counter})"
duplicate_shelf = ub.MagicShelf(
user_id=current_user.id,
name=new_name,
icon=shelf.icon,
rules=shelf.rules.copy() if shelf.rules else {},
is_system=False, # Duplicates are never system shelves
is_public=0 # Duplicates start as private
)
ub.session.add(duplicate_shelf)
ub.session_commit()
log.info(f"User {current_user.id} duplicated magic shelf {shelf_id} as '{new_name}' (ID: {duplicate_shelf.id})")
return jsonify({
"success": True,
"shelf_id": duplicate_shelf.id,
"message": _("Shelf duplicated successfully")
})
except Exception as e:
log.error(f"Error duplicating magic shelf {shelf_id}: {e}")
ub.session.rollback()
return jsonify({"success": False, "message": _("Error duplicating shelf")}), 500
@web.route("/magicshelf/<int:shelf_id>/delete", methods=["POST"])
@user_login_required
def delete_magic_shelf(shelf_id):
shelf = ub.session.query(ub.MagicShelf).get(shelf_id)
if not shelf:
log.warning(f"Magic shelf {shelf_id} not found for deletion")
abort(404)
# Check if user can delete this shelf
can_delete = False
if shelf.user_id == current_user.id:
can_delete = True
elif shelf.is_public == 1 and current_user.role_edit_shelfs():
can_delete = True
if not can_delete:
log.warning(f"User {current_user.id} attempted to delete magic shelf {shelf_id} without permission")
abort(403)
# Prevent deletion of system shelves
if shelf.is_system:
log.warning(f"User {current_user.id} attempted to delete system shelf {shelf_id}")
return jsonify({
"success": False,
"message": _("System shelves cannot be deleted. You can hide them in your user profile settings.")
}), 400
try:
shelf_name = shelf.name
# Delete cache entries first
ub.session.query(ub.MagicShelfCache).filter_by(shelf_id=shelf_id).delete()
# Delete any hide records for this shelf
ub.session.query(ub.HiddenMagicShelfTemplate).filter_by(shelf_id=shelf_id).delete()
# Delete the shelf
ub.session.delete(shelf)
ub.session_commit()
log.info(f"User {current_user.id} deleted magic shelf {shelf_id} ('{shelf_name}')")
return jsonify({"success": True})
except Exception as e:
log.error(f"Error deleting magic shelf {shelf_id}: {e}")
ub.session.rollback()
return jsonify({"success": False, "message": _("Error deleting shelf")}), 500
@web.route("/magicshelf/<int:shelf_id>/hide", methods=["POST"])
@user_login_required
def hide_magic_shelf(shelf_id):
"""Hide a public magic shelf (user doesn't want to see it in their sidebar)."""
shelf = ub.session.query(ub.MagicShelf).get(shelf_id)
if not shelf:
log.warning(f"Magic shelf {shelf_id} not found")
abort(404)
# Can only hide shelves you don't own (public or system)
if shelf.user_id == current_user.id:
return jsonify({
"success": False,
"message": _("You cannot hide your own shelves. Delete them instead if you don't want them.")
}), 400
# Check if already hidden
existing = ub.session.query(ub.HiddenMagicShelfTemplate).filter(
ub.HiddenMagicShelfTemplate.user_id == current_user.id,
ub.HiddenMagicShelfTemplate.shelf_id == shelf_id
).first()
if existing:
return jsonify({"success": True, "message": _("Shelf already hidden")})
try:
hidden = ub.HiddenMagicShelfTemplate(
user_id=current_user.id,
shelf_id=shelf_id
)
ub.session.add(hidden)
ub.session_commit()
log.info(f"User {current_user.id} hid magic shelf {shelf_id} ('{shelf.name}')")
return jsonify({"success": True})
except Exception as e:
log.error(f"Error hiding magic shelf {shelf_id}: {e}")
ub.session.rollback()
return jsonify({"success": False, "message": _("Error hiding shelf")}), 500
@web.route("/magicshelf/<int:shelf_id>/unhide", methods=["POST"])
@user_login_required
def unhide_magic_shelf(shelf_id):
"""Unhide a previously hidden magic shelf."""
try:
hidden = ub.session.query(ub.HiddenMagicShelfTemplate).filter(
ub.HiddenMagicShelfTemplate.user_id == current_user.id,
ub.HiddenMagicShelfTemplate.shelf_id == shelf_id
).first()
if not hidden:
return jsonify({"success": True, "message": _("Shelf was not hidden")})
ub.session.delete(hidden)
ub.session_commit()
log.info(f"User {current_user.id} unhid magic shelf {shelf_id}")
return jsonify({"success": True})
except Exception as e:
log.error(f"Error unhiding magic shelf {shelf_id}: {e}")
ub.session.rollback()
return jsonify({"success": False, "message": _("Error unhiding shelf")}), 500
@web.route("/table")
@user_login_required
def books_table():
visibility = current_user.view_settings.get('table', {})
cc = calibre_db.get_cc_columns(config, filter_config_custom_read=True)
return render_title_template('book_table.html', title=_("Books List"), cc=cc, page="book_table",
visiblility=visibility)
@web.route("/ajax/listbooks")
@user_login_required
def list_books():
off = int(request.args.get("offset") or 0)
limit = int(request.args.get("limit") or config.config_books_per_page)
search_param = request.args.get("search")
sort_param = request.args.get("sort", "id")
order = request.args.get("order", "").lower()
state = None
join = tuple()
if sort_param == "state":
state = json.loads(request.args.get("state", "[]"))
elif sort_param == "tags":
order = [db.Tags.name.asc()] if order == "asc" else [db.Tags.name.desc()]
join = db.books_tags_link, db.Books.id == db.books_tags_link.c.book, db.Tags
elif sort_param == "series":
order = [db.Series.name.asc()] if order == "asc" else [db.Series.name.desc()]
join = db.books_series_link, db.Books.id == db.books_series_link.c.book, db.Series
elif sort_param == "publishers":
order = [db.Publishers.name.asc()] if order == "asc" else [db.Publishers.name.desc()]
join = db.books_publishers_link, db.Books.id == db.books_publishers_link.c.book, db.Publishers
elif sort_param == "authors":
order = [db.Authors.name.asc(), db.Series.name, db.Books.series_index] if order == "asc" \
else [db.Authors.name.desc(), db.Series.name.desc(), db.Books.series_index.desc()]
join = db.books_authors_link, db.Books.id == db.books_authors_link.c.book, db.Authors, db.books_series_link, \
db.Books.id == db.books_series_link.c.book, db.Series
elif sort_param == "author_sort":
order = [db.Books.author_sort.asc()] if order == "asc" else [db.Books.author_sort.desc()]
elif sort_param == "languages":
order = [db.Languages.lang_code.asc()] if order == "asc" else [db.Languages.lang_code.desc()]
join = db.books_languages_link, db.Books.id == db.books_languages_link.c.book, db.Languages
elif order and sort_param in ["sort", "title", "authors_sort", "series_index"]:
order = [text(sort_param + " " + order)]
elif not state:
order = [db.Books.timestamp.desc()]
total_count = filtered_count = calibre_db.session.query(db.Books).filter(
calibre_db.common_filters(allow_show_archived=True)).count()
if state is not None:
if search_param:
books = calibre_db.search_query(search_param, config).all()
filtered_count = len(books)
else:
query = calibre_db.generate_linked_query(config.config_read_column, db.Books)
books = query.filter(calibre_db.common_filters(allow_show_archived=True)).all()
entries = calibre_db.get_checkbox_sorted(books, state, off, limit, order, True)
elif search_param:
entries, filtered_count, __ = calibre_db.get_search_results(search_param,
config,
off,
[order, ''],
limit,
*join)
else:
entries, __, __ = calibre_db.fill_indexpage_with_archived_books((int(off) / (int(limit)) + 1),
db.Books,
limit,
True,
order,
True,
True,
config.config_read_column,
*join)
result = list()
for entry in entries:
val = entry[0]
val.is_archived = entry[1] is True
val.read_status = entry[2] == ub.ReadBook.STATUS_FINISHED
for lang_index in range(0, len(val.languages)):
val.languages[lang_index].language_name = isoLanguages.get_language_name(get_locale(), val.languages[
lang_index].lang_code)
result.append(val)
table_entries = {'totalNotFiltered': total_count, 'total': filtered_count, "rows": result}
js_list = json.dumps(table_entries, cls=db.AlchemyEncoder)
response = make_response(js_list)
response.headers["Content-Type"] = "application/json; charset=utf-8"
return response
@web.route("/ajax/table_settings", methods=['POST'])
@user_login_required
def update_table_settings():
current_user.view_settings['table'] = json.loads(request.data)
try:
try:
flag_modified(current_user, "view_settings")
except AttributeError:
pass
ub.session.commit()
except (InvalidRequestError, OperationalError):
log.error("Invalid request received: %r ", request, )
return "Invalid request", 400
return ""
@web.route("/author")
@login_required_if_no_ano
def author_list():
if current_user.check_visibility(constants.SIDEBAR_AUTHOR):
if current_user.get_view_property('author', 'dir') == 'desc':
order = db.Authors.sort.desc()
order_no = 0
else:
order = db.Authors.sort.asc()
order_no = 1
entries = calibre_db.session.query(db.Authors, func.count('books_authors_link.book').label('count')) \
.join(db.books_authors_link).join(db.Books).filter(calibre_db.common_filters()) \
.group_by(text('books_authors_link.author')).order_by(order).all()
char_list = query_char_list(db.Authors.sort, db.books_authors_link)
return render_title_template('list.html', entries=entries, folder='web.books_list', charlist=char_list,
title="Authors", page="authorlist", data='author', order=order_no)
else:
abort(404)
@web.route("/downloadlist")
@login_required_if_no_ano
def download_list():
if current_user.get_view_property('download', 'dir') == 'desc':
order = ub.User.name.desc()
order_no = 0
else:
order = ub.User.name.asc()
order_no = 1
if current_user.check_visibility(constants.SIDEBAR_DOWNLOAD) and current_user.role_admin():
entries = ub.session.query(ub.User, func.count(ub.Downloads.book_id).label('count')) \
.join(ub.Downloads).group_by(ub.Downloads.user_id).order_by(order).all()
char_list = ub.session.query(func.upper(func.substr(ub.User.name, 1, 1)).label('char')) \
.filter(ub.User.role.op('&')(constants.ROLE_ANONYMOUS) != constants.ROLE_ANONYMOUS) \
.group_by(func.upper(func.substr(ub.User.name, 1, 1))).all()
return render_title_template('list.html', entries=entries, folder='web.books_list', charlist=char_list,
title=_("Downloads"), page="downloadlist", data="download", order=order_no)
else:
abort(404)
@web.route("/publisher")
@login_required_if_no_ano
def publisher_list():
if current_user.check_visibility(constants.SIDEBAR_PUBLISHER):
order_dir = current_user.get_view_property('publisher', 'dir')
order_no = 1 if order_dir != 'desc' else 0
order = db.Publishers.name.desc() if order_dir == 'desc' else db.Publishers.name.asc()
entries_query = (calibre_db.session.query(db.Publishers, func.count(db.books_publishers_link.c.book).label('count'))
.join(db.books_publishers_link, db.Publishers.id == db.books_publishers_link.c.publisher)
.join(db.Books, db.books_publishers_link.c.book == db.Books.id)
.filter(calibre_db.common_filters())
.group_by(db.Publishers.id)
.order_by(order))
entries = entries_query.all()
no_publisher_count = (calibre_db.session.query(func.count(db.Books.id))
.outerjoin(db.books_publishers_link)
.filter(db.books_publishers_link.c.book == None)
.filter(calibre_db.common_filters())
.scalar())
if no_publisher_count:
# Manually create a "None" category entry
none_publisher_entry = (db.Category(_("None"), "-1"), no_publisher_count)
# Decide where to insert it based on sort order
if order_no == 1: # ascending
entries.insert(0, none_publisher_entry)
else: # descending
entries.append(none_publisher_entry)
char_list = [entry[0].name[0].upper() for entry in entries if entry[0].name]
char_list = sorted(list(set(char_list)))
return render_title_template('list.html', entries=entries, folder='web.books_list', charlist=char_list,
title=_("Publishers"), page="publisherlist", data="publisher", order=order_no)
else:
abort(404)
@web.route("/series")
@login_required_if_no_ano
def series_list():
if current_user.check_visibility(constants.SIDEBAR_SERIES):
if current_user.get_view_property('series', 'dir') == 'desc':
order = db.Series.sort.desc()
order_no = 0
else:
order = db.Series.sort.asc()
order_no = 1
char_list = query_char_list(db.Series.sort, db.books_series_link)
if current_user.get_view_property('series', 'series_view') == 'list':
entries = calibre_db.session.query(db.Series, func.count('books_series_link.book').label('count')) \
.join(db.books_series_link).join(db.Books).filter(calibre_db.common_filters()) \
.group_by(text('books_series_link.series')).order_by(order).all()
no_series_count = (calibre_db.session.query(db.Books)
.outerjoin(db.books_series_link).outerjoin(db.Series)
.filter(db.Series.name == None)
.filter(calibre_db.common_filters())
.count())
if no_series_count:
entries.append([db.Category(_("None"), "-1"), no_series_count])
entries = sorted(entries, key=lambda x: x[0].name.lower(), reverse=not order_no)
return render_title_template('list.html',
entries=entries,
folder='web.books_list',
charlist=char_list,
title=_("Series"),
page="serieslist",
data="series", order=order_no)
else:
entries = (calibre_db.session.query(db.Books, func.count('books_series_link').label('count'),
func.max(db.Books.series_index), db.Books.id)
.join(db.books_series_link).join(db.Series).filter(calibre_db.common_filters())
.group_by(text('books_series_link.series'))
.having(or_(func.max(db.Books.series_index), db.Books.series_index==""))
.order_by(order)
.all())
return render_title_template('grid.html', entries=entries, folder='web.books_list', charlist=char_list,
title=_("Series"), page="serieslist", data="series", bodyClass="grid-view",
order=order_no)
else:
abort(404)
@web.route("/ratings")
@login_required_if_no_ano
def ratings_list():
if current_user.check_visibility(constants.SIDEBAR_RATING):
order_dir = current_user.get_view_property('ratings', 'dir')
order_no = 1 if order_dir != 'desc' else 0
order = db.Ratings.rating.desc() if order_dir == 'desc' else db.Ratings.rating.asc()
entries_query = (calibre_db.session.query(db.Ratings, func.count(db.books_ratings_link.c.book).label('count'),
(db.Ratings.rating / 2).label('name'))
.join(db.books_ratings_link, db.Ratings.id == db.books_ratings_link.c.rating)
.join(db.Books, db.books_ratings_link.c.book == db.Books.id)
.filter(calibre_db.common_filters())
.filter(db.Ratings.rating > 0)
.group_by(db.Ratings.id)
.order_by(order))
entries = entries_query.all()
no_rating_count = (calibre_db.session.query(func.count(db.Books.id))
.outerjoin(db.books_ratings_link, db.Books.id == db.books_ratings_link.c.book)
.outerjoin(db.Ratings, db.books_ratings_link.c.rating == db.Ratings.id)
.filter(calibre_db.common_filters())
.filter(or_(db.books_ratings_link.c.rating == None, db.Ratings.rating == 0))
.scalar())
if no_rating_count:
none_rating_entry = (db.Category(_("None"), "-1"), no_rating_count, 0)
if order_no == 1: # ascending
entries.insert(0, none_rating_entry)
else: # descending
entries.append(none_rating_entry)
return render_title_template('list.html', entries=entries, folder='web.books_list',
title=_("Ratings"), page="ratingslist", data="ratings", order=order_no)
else:
abort(404)
@web.route("/formats")
@login_required_if_no_ano
def formats_list():
if current_user.check_visibility(constants.SIDEBAR_FORMAT):
if current_user.get_view_property('formats', 'dir') == 'desc':
order = db.Data.format.desc()
order_no = 0
else:
order = db.Data.format.asc()
order_no = 1
entries = calibre_db.session.query(db.Data,
func.count('data.book').label('count'),
db.Data.format.label('format')) \
.join(db.Books).filter(calibre_db.common_filters()) \
.group_by(db.Data.format).order_by(order).all()
no_format_count = (calibre_db.session.query(db.Books).outerjoin(db.Data)
.filter(db.Data.format == None)
.filter(calibre_db.common_filters())
.count())
if no_format_count:
entries.append([db.Category(_("None"), "-1"), no_format_count])
return render_title_template('list.html', entries=entries, folder='web.books_list', charlist=list(),
title=_("File formats list"), page="formatslist", data="formats", order=order_no)
else:
abort(404)
@web.route("/language")
@login_required_if_no_ano
def language_overview():
if current_user.check_visibility(constants.SIDEBAR_LANGUAGE) and current_user.filter_language() == "all":
order_no = 0 if current_user.get_view_property('language', 'dir') == 'desc' else 1
languages = calibre_db.speaking_language(reverse_order=not order_no, with_count=True)
char_list = generate_char_list(languages)
return render_title_template('list.html', entries=languages, folder='web.books_list', charlist=char_list,
title=_("Languages"), page="langlist", data="language", order=order_no)
else:
abort(404)
@web.route("/category")
@login_required_if_no_ano
def category_list():
if current_user.check_visibility(constants.SIDEBAR_CATEGORY):
if current_user.get_view_property('category', 'dir') == 'desc':
order = db.Tags.name.desc()
order_no = 0
else:
order = db.Tags.name.asc()
order_no = 1
entries = calibre_db.session.query(db.Tags, func.count('books_tags_link.book').label('count')) \
.join(db.books_tags_link).join(db.Books).order_by(order).filter(calibre_db.common_filters()) \
.group_by(db.Tags.id).all()
no_tag_count = (calibre_db.session.query(db.Books)
.outerjoin(db.books_tags_link).outerjoin(db.Tags)
.filter(db.Tags.name == None)
.filter(calibre_db.common_filters())
.count())
if no_tag_count:
entries.append([db.Category(_("None"), "-1"), no_tag_count])
entries = sorted(entries, key=lambda x: x[0].name.lower(), reverse=not order_no)
char_list = generate_char_list(entries)
return render_title_template('list.html', entries=entries, folder='web.books_list', charlist=char_list,
title=_("Categories"), page="catlist", data="category", order=order_no)
else:
abort(404)
# ################################### Download/Send ##################################################################
@web.route("/cover/<int:book_id>")
@web.route("/cover/<int:book_id>/<string:resolution>")
@login_required_if_no_ano
def get_cover(book_id, resolution=None):
resolutions = {
'og': constants.COVER_THUMBNAIL_ORIGINAL,
'sm': constants.COVER_THUMBNAIL_SMALL,
'md': constants.COVER_THUMBNAIL_MEDIUM,
'lg': constants.COVER_THUMBNAIL_LARGE,
}
cover_resolution = resolutions.get(resolution, None)
return get_book_cover(book_id, cover_resolution)
@web.route("/series_cover/<int:series_id>")
@web.route("/series_cover/<int:series_id>/<string:resolution>")
@login_required_if_no_ano
def get_series_cover(series_id, resolution=None):
resolutions = {
'og': constants.COVER_THUMBNAIL_ORIGINAL,
'sm': constants.COVER_THUMBNAIL_SMALL,
'md': constants.COVER_THUMBNAIL_MEDIUM,
'lg': constants.COVER_THUMBNAIL_LARGE,
}
cover_resolution = resolutions.get(resolution, None)
return get_series_cover_thumbnail(series_id, cover_resolution)
@web.route("/robots.txt")
def get_robots():
try:
return send_from_directory(constants.STATIC_DIR, "robots.txt")
except PermissionError:
log.error("No permission to access robots.txt file.")
abort(403)
def _is_valid_container_xml(container_bytes):
try:
ET.fromstring(container_bytes)
return True
except Exception:
return False
def _sanitize_container_xml(container_bytes):
try:
text = container_bytes.decode("utf-8", errors="replace")
except Exception:
return container_bytes
decl_pattern = re.compile(r"<\?xml[^>]*\?>")
decls = list(decl_pattern.finditer(text))
if len(decls) <= 1:
return container_bytes
first = decls[0]
cleaned = text[:first.end()] + decl_pattern.sub("", text[first.end():])
return cleaned.encode("utf-8")
def _get_fixed_epub_path(book_id, original_path):
fix_dir = os.path.join(constants.CONFIG_DIR, "epub_fixes")
try:
os.makedirs(fix_dir, exist_ok=True)
except Exception:
return None
try:
mtime = int(os.path.getmtime(original_path))
except Exception:
mtime = 0
return os.path.join(fix_dir, f"{book_id}_{mtime}.epub")
def _repair_epub_container_if_needed(book_id, original_path):
try:
with zipfile.ZipFile(original_path, "r") as zin:
container_bytes = zin.read("META-INF/container.xml")
if _is_valid_container_xml(container_bytes):
return None
fixed_path = _get_fixed_epub_path(book_id, original_path)
if not fixed_path:
return None
if os.path.exists(fixed_path):
return fixed_path
temp_path = fixed_path + ".tmp"
with zipfile.ZipFile(original_path, "r") as zin, zipfile.ZipFile(temp_path, "w") as zout:
for item in zin.infolist():
data = zin.read(item.filename)
if item.filename == "META-INF/container.xml":
data = _sanitize_container_xml(data)
zi = zipfile.ZipInfo(item.filename)
zi.date_time = item.date_time
zi.compress_type = item.compress_type
zi.external_attr = item.external_attr
zi.internal_attr = item.internal_attr
zi.extra = item.extra
zi.comment = item.comment
zout.writestr(zi, data, compress_type=item.compress_type)
os.replace(temp_path, fixed_path)
return fixed_path
except KeyError:
return None
except Exception as ex:
log.error("Failed to repair EPUB container.xml for book %s: %s", book_id, ex)
return None
@web.route("/show/<int:book_id>/<book_format>", defaults={'anyname': 'None'})
@web.route("/show/<int:book_id>/<book_format>/<anyname>")
@login_required_if_no_ano
@viewer_required
def serve_book(book_id, book_format, anyname):
book_format = book_format.split(".")[0]
book = calibre_db.get_book(book_id)
data = calibre_db.get_book_format(book_id, book_format.upper())
if not data:
return "File not in Database"
range_header = request.headers.get('Range', None)
if config.config_use_google_drive:
try:
headers = Headers()
headers["Content-Type"] = mimetypes.types_map.get('.' + book_format, "application/octet-stream")
if not range_header:
log.info('Serving book: %s', data.name)
headers['Accept-Ranges'] = 'bytes'
df = getFileFromEbooksFolder(book.path, data.name + "." + book_format)
return do_gdrive_download(df, headers, (book_format.upper() == 'TXT'))
except AttributeError as ex:
log.error_or_exception(ex)
return "File Not Found"
else:
if book_format.upper() == 'EPUB':
original_path = os.path.join(config.get_book_path(), book.path, data.name + "." + book_format)
fixed_path = _repair_epub_container_if_needed(book_id, original_path)
if fixed_path:
response = make_response(send_file(fixed_path, mimetype="application/epub+zip"))
if not range_header:
log.info('Serving repaired book: %s', data.name)
response.headers['Accept-Ranges'] = 'bytes'
return response
if book_format.upper() == 'TXT':
log.info('Serving book: %s', data.name)
try:
rawdata = open(os.path.join(config.get_book_path(), book.path, data.name + "." + book_format),
"rb").read()
result = chardet.detect(rawdata)
try:
text_data = rawdata.decode(result['encoding']).encode('utf-8')
except UnicodeDecodeError as e:
log.error("Encoding error in text file {}: {}".format(book.id, e))
if "surrogate" in e.reason:
text_data = rawdata.decode(result['encoding'], 'surrogatepass').encode('utf-8', 'surrogatepass')
else:
text_data = rawdata.decode(result['encoding'], 'ignore').encode('utf-8', 'ignore')
return make_response(text_data)
except FileNotFoundError:
log.error("File Not Found")
return "File Not Found"
# enable byte range read of pdf
response = make_response(
send_from_directory(os.path.join(config.get_book_path(), book.path), data.name + "." + book_format))
if not range_header:
log.info('Serving book: %s', data.name)
response.headers['Accept-Ranges'] = 'bytes'
return response
@web.route("/download/<int:book_id>/<book_format>", defaults={'anyname': 'None'})
@web.route("/download/<int:book_id>/<book_format>/<anyname>")
@login_required_if_no_ano
@download_required
def download_link(book_id, book_format, anyname):
client = "kobo" if "Kobo" in request.headers.get('User-Agent') else ""
return get_download_link(book_id, book_format, client)
@web.route('/send/<int:book_id>/<book_format>/<int:convert>', methods=["POST"])
@login_required_if_no_ano
@download_required
def send_to_ereader(book_id, book_format, convert):
if not config.get_mail_server_configured():
response = [{'type': "danger", 'message': _("Please configure the SMTP mail settings first...")}]
return Response(json.dumps(response), mimetype='application/json')
if not current_user.kindle_mail:
response = [{'type': "danger", 'message': _("Oops! Please update your profile with a valid eReader Email.")}]
return Response(json.dumps(response), mimetype='application/json')
result = send_mail(book_id, book_format, convert, current_user.kindle_mail, config.get_book_path(),
current_user.name, current_user.kindle_mail_subject)
if result is None:
ub.update_download(book_id, int(current_user.id))
# Track email/send activity
try:
from scripts.cwa_db import CWA_DB
book = calibre_db.get_book(book_id)
cwa_db = CWA_DB()
cwa_db.log_activity(
user_id=int(current_user.id),
user_name=current_user.name,
event_type='EMAIL',
item_id=book_id,
item_title=book.title if book else 'Unknown',
extra_data=book_format.upper()
)
except Exception as e:
log.debug(f"Failed to log email activity: {e}")
response = [{'type': "success", 'message': _("Success! Book queued for sending to %(eReadermail)s",
eReadermail=current_user.kindle_mail)}]
else:
response = [{'type': "danger", 'message': _("Oops! There was an error sending book: %(res)s", res=result)}]
return Response(json.dumps(response), mimetype='application/json')
@web.route('/send_selected/<int:book_id>', methods=["POST"])
@login_required_if_no_ano
@download_required
def send_to_selected_ereaders(book_id):
if not config.get_mail_server_configured():
response = [{'type': "danger", 'message': _("Please configure the SMTP mail settings first...")}]
return Response(json.dumps(response), mimetype='application/json')
selected_emails = request.form.get('selected_emails', '')
book_format = request.form.get('book_format', '')
convert = request.form.get('convert', '0')
if not selected_emails:
response = [{'type': "danger", 'message': _("No email addresses selected")}]
return Response(json.dumps(response), mimetype='application/json')
result = send_mail(book_id, book_format, int(convert), selected_emails, config.get_book_path(), current_user.name, current_user.kindle_mail_subject)
if result is None:
ub.update_download(book_id, int(current_user.id))
# Track email/send activity
try:
from scripts.cwa_db import CWA_DB
book = calibre_db.get_book(book_id)
cwa_db = CWA_DB()
cwa_db.log_activity(
user_id=int(current_user.id),
user_name=current_user.name,
event_type='EMAIL',
item_id=book_id,
item_title=book.title if book else 'Unknown',
extra_data=book_format.upper()
)
except Exception as e:
log.debug(f"Failed to log email activity: {e}")
response = [{'type': "success", 'message': _("Success! Book queued for sending to the selected address(es)!")}]
else:
response = [{'type': "danger", 'message': _("Oops! There was an error sending book: %(res)s", res=result)}]
return Response(json.dumps(response), mimetype='application/json')
# ################################### Login Logout ##################################################################
@web.route('/register', methods=['POST'])
@limiter.limit("40/day", key_func=get_remote_address)
@limiter.limit("3/minute", key_func=get_remote_address)
def register_post():
if not config.config_public_reg:
abort(404)
to_save = request.form.to_dict()
try:
limiter.check()
except RateLimitExceeded:
flash(_(u"Please wait one minute to register next user"), category="error")
return render_title_template('register.html', config=config, title=_("Register"), page="register")
except (ConnectionError, Exception) as e:
log.error("Connection error to limiter backend: %s", e)
flash(_("Connection error to limiter backend, please contact your administrator"), category="error")
return render_title_template('register.html', config=config, title=_("Register"), page="register")
if current_user is not None and current_user.is_authenticated:
return redirect(url_for('web.index'))
if not config.get_mail_server_configured():
flash(_("Oops! Email server is not configured, please contact your administrator."), category="error")
return render_title_template('register.html', title=_("Register"), page="register")
nickname = strip_whitespaces(to_save.get("email", "")) if config.config_register_email else to_save.get('name')
if not nickname or not to_save.get("email"):
flash(_("Oops! Please complete all fields."), category="error")
return render_title_template('register.html', title=_("Register"), page="register")
try:
nickname = check_username(nickname)
email = check_email(to_save.get("email", ""))
except Exception as ex:
flash(str(ex), category="error")
return render_title_template('register.html', title=_("Register"), page="register")
content = ub.User()
if check_valid_domain(email):
content.name = nickname
content.email = email
password = generate_random_password(config.config_password_min_length)
content.password = generate_password_hash(password)
content.role = config.config_default_role
content.locale = config.config_default_locale
content.sidebar_view = config.config_default_show
# Default to configured theme for new self-registered users (fallback to caliBlur=1)
try:
content.theme = getattr(config, 'config_theme', 1)
except Exception:
pass
try:
ub.session.add(content)
ub.session.commit()
if feature_support['oauth']:
register_user_with_oauth(content)
send_registration_mail(strip_whitespaces(to_save.get("email", "")), nickname, password)
except Exception:
ub.session.rollback()
flash(_("Oops! An unknown error occurred. Please try again later."), category="error")
return render_title_template('register.html', title=_("Register"), page="register")
else:
flash(_("Oops! Your Email is not allowed."), category="error")
log.warning('Registering failed for user "{}" Email: {}'.format(nickname, to_save.get("email","")))
return render_title_template('register.html', title=_("Register"), page="register")
flash(_("Success! Confirmation Email has been sent."), category="success")
return redirect(url_for('web.login'))
@web.route('/register', methods=['GET'])
def register():
if not config.config_public_reg:
abort(404)
if current_user is not None and current_user.is_authenticated:
return redirect(url_for('web.index'))
if not config.get_mail_server_configured():
flash(_("Oops! Email server is not configured, please contact your administrator."), category="error")
return render_title_template('register.html', title=_("Register"), page="register")
if feature_support['oauth']:
register_user_with_oauth()
return render_title_template('register.html', config=config, title=_("Register"), page="register")
def handle_login_user(user, remember, message, category):
login_user(user, remember=remember)
# Track login activity
try:
from scripts.cwa_db import CWA_DB
cwa_db = CWA_DB()
cwa_db.log_activity(
user_id=int(user.id),
user_name=user.name,
event_type='LOGIN'
)
except Exception as e:
log.debug(f"Failed to log login activity: {e}")
flash(message, category=category)
[limiter.limiter.storage.clear(k.key) for k in limiter.current_limits]
# Clear login redirect count on successful login
flask_session.pop('_login_redirect_count', None)
return redirect(get_redirect_location(request.form.get('next', None), "web.index"))
def render_login(username="", password=""):
# Detect authentication redirect loops
redirect_count = flask_session.get('_login_redirect_count', 0)
if redirect_count > 3:
flask_session.pop('_login_redirect_count', None)
log.warning("Authentication redirect loop detected from IP: %s", request.remote_addr)
flash(_("Authentication loop detected. If you're experiencing login issues, please contact your administrator."), category="error")
else:
flask_session['_login_redirect_count'] = redirect_count + 1
next_url = request.args.get('next', default=url_for("web.index"), type=str)
if url_for("web.logout") == next_url:
next_url = url_for("web.index")
# Get OAuth check status
oauth_check = oauth_bb.oauth_check if feature_support['oauth'] else {}
# Get generic OAuth login button text for display
generic_login_button = None
if feature_support['oauth']:
try:
# oauth_bb is already imported at module level, access oauthblueprints from it
# oauthblueprints[2] is the generic OIDC provider (index 0=github, 1=google, 2=generic)
if hasattr(oauth_bb, 'oauthblueprints') and len(oauth_bb.oauthblueprints) > 2:
generic_login_button = oauth_bb.oauthblueprints[2].get('login_button') or 'OpenID Connect'
except (AttributeError, IndexError):
# Silently fall back to default if oauthblueprints not available
pass
return render_title_template('login.html',
title=_("Login"),
next_url=next_url,
config=config,
username=username,
password=password,
oauth_check=oauth_check,
generic_login_button=generic_login_button,
mail=config.get_mail_server_configured(), page="login")
@web.route('/login', methods=['GET'])
def login():
if current_user is not None and current_user.is_authenticated:
return redirect(url_for('web.index'))
# Handle OAuth-only authentication mode
if config.config_login_type == constants.LOGIN_OAUTH:
# In OAuth-only mode, show OAuth options but still render login template
# This prevents infinite redirects to OAuth providers
if not feature_support['oauth']:
log.error("OAuth authentication is enabled but OAuth support is not available")
flash(_("OAuth authentication is not properly configured. Please contact administrator."), category="error")
return render_login()
if config.config_login_type == constants.LOGIN_LDAP and not services.ldap:
log.error(u"Cannot activate LDAP authentication")
flash(_(u"Cannot activate LDAP authentication"), category="error")
return render_login()
@web.route('/login', methods=['POST'])
@limiter.limit("40/day", key_func=lambda: strip_whitespaces(request.form.get('username', "")).lower())
@limiter.limit("3/minute", key_func=lambda: strip_whitespaces(request.form.get('username', "")).lower())
def login_post():
if config.config_disable_standard_login:
flash(_("Standard login is disabled."), category="error")
return render_login()
form = request.form.to_dict()
username = strip_whitespaces(form.get('username', "")).lower().replace("\n","").replace("\r","")
try:
limiter.check()
except RateLimitExceeded:
flash(_("Please wait one minute before next login"), category="error")
return render_login(username, form.get("password", ""))
except (ConnectionError, Exception) as e:
log.error("Connection error to limiter backend: %s", e)
flash(_("Connection error to limiter backend, please contact your administrator"), category="error")
return render_login(username, form.get("password", ""))
if current_user is not None and current_user.is_authenticated:
return redirect(url_for('web.index'))
if config.config_login_type == constants.LOGIN_LDAP and not services.ldap:
log.error(u"Cannot activate LDAP authentication")
flash(_(u"Cannot activate LDAP authentication"), category="error")
user = ub.session.query(ub.User).filter(func.lower(ub.User.name) == username).first()
remember_me = bool(form.get('remember_me'))
if config.config_login_type == constants.LOGIN_LDAP and services.ldap and form.get('password', '') != "":
# Validate username before attempting LDAP authentication
if not username or not username.strip():
log.warning("LDAP authentication attempted with empty username")
flash(_(u"Username cannot be empty"), category="error")
else:
# Try LDAP authentication first, regardless of whether user exists locally
login_result, error = services.ldap.bind_user(username, form['password'])
if login_result:
# LDAP authentication successful
if user:
# Existing user - login normally
log.debug(u"You are now logged in as: '{}'".format(user.name))
return handle_login_user(user,
remember_me,
_(u"you are now logged in as: '%(nickname)s'", nickname=user.name),
"success")
else:
# New user - create if auto-creation is enabled
if getattr(config, 'config_ldap_auto_create_users', True):
try:
# Get user details from LDAP
ldap_user_details = services.ldap.get_object_details(username)
if ldap_user_details:
# Create user using existing LDAP import function
from . import admin
create_result, error_msg = admin.ldap_import_create_user(username, ldap_user_details)
if create_result:
# Get the newly created user
user = ub.session.query(ub.User).filter(func.lower(ub.User.name) == username.lower()).first()
if user:
log.info("LDAP auto-created user: '%s'", username)
return handle_login_user(user,
remember_me,
_(u"Welcome! Your account has been automatically created. You are now logged in as: '%(nickname)s'", nickname=user.name),
"success")
# If we get here, user creation failed
log.error("LDAP auto-creation failed for user '%s'", username)
flash(_(u"Authentication successful, but account creation failed. Please contact your administrator."), category="error")
except Exception as ex:
log.error("LDAP auto-creation error for user '%s': %s", username, ex)
flash(_(u"Authentication successful, but account creation failed. Please contact your administrator."), category="error")
else:
# Auto-creation disabled
log.info("LDAP user '%s' authenticated but not found locally, auto-creation disabled", username)
flash(_(u"Authentication successful, but no local account found. Please contact your administrator to create your account."), category="error")
elif login_result is None and user and check_password_hash(str(user.password), form['password']) \
and user.name != "Guest":
# LDAP unavailable, try local fallback
log.info("Local Fallback Login as: '{}'".format(user.name))
return handle_login_user(user,
remember_me,
_(u"Fallback Login as: '%(nickname)s', "
u"LDAP Server not reachable, or user not known", nickname=user.name),
"warning")
elif login_result is None:
# LDAP unavailable and no local fallback
log.info(error)
flash(_(u"Could not login: %(message)s", message=error), category="error")
else:
# LDAP authentication failed
# Use request.remote_addr (already corrected by ProxyFix) instead of raw header
ip_address = request.remote_addr
log.warning('LDAP Login failed for user "%s" IP-address: %s', username, ip_address)
# Track failed login attempt
try:
from scripts.cwa_db import CWA_DB
import json
cwa_db = CWA_DB()
cwa_db.log_activity(
user_id=None,
user_name='Anonymous',
event_type='LOGIN_FAILED',
item_id=None,
item_title=None,
extra_data=json.dumps({'username_attempted': username, 'ip': ip_address, 'method': 'LDAP'})
)
except Exception as e:
log.debug(f"Failed to log failed login attempt: {e}")
flash(_(u"Wrong Username or Password"), category="error")
flash(_(u"Wrong Username or Password"), category="error")
else:
# Use request.remote_addr (already corrected by ProxyFix) instead of raw header
ip_address = request.remote_addr
if form.get('forgot', "") == 'forgot':
if user is not None and user.name != "Guest":
ret, __ = reset_password(user.id)
if ret == 1:
flash(_(u"New Password was sent to your email address"), category="info")
log.info('Password reset for user "%s" IP-address: %s', username, ip_address)
else:
log.error(u"An unknown error occurred. Please try again later")
flash(_(u"An unknown error occurred. Please try again later."), category="error")
else:
flash(_(u"Please enter valid username to reset password"), category="error")
log.warning('Username missing for password reset IP-address: %s', ip_address)
else:
if user and check_password_hash(str(user.password), form['password']) and user.name != "Guest":
config.config_is_initial = False
log.debug(u"You are now logged in as: '{}'".format(user.name))
return handle_login_user(user,
remember_me,
_(u"You are now logged in as: '%(nickname)s'", nickname=user.name),
"success")
else:
log.warning('Login failed for user "{}" IP-address: {}'.format(username, ip_address))
# Track failed login attempt
try:
from scripts.cwa_db import CWA_DB
import json
cwa_db = CWA_DB()
cwa_db.log_activity(
user_id=None,
user_name='Anonymous',
event_type='LOGIN_FAILED',
item_id=None,
item_title=None,
extra_data=json.dumps({'username_attempted': username, 'ip': ip_address, 'method': 'standard'})
)
except Exception as e:
log.debug(f"Failed to log failed login attempt: {e}")
flash(_(u"Wrong Username or Password"), category="error")
return render_login(username, form.get("password", ""))
@web.route('/logout')
@user_login_required
def logout():
if current_user is not None and current_user.is_authenticated:
if feature_support['oauth'] and (config.config_login_type == 2 or config.config_login_type == 3):
logout_oauth_user()
ub.delete_user_session(current_user.id, flask_session.get('_id', ""))
logout_user()
# Clear login redirect count on logout to prevent false positives
flask_session.pop('_login_redirect_count', None)
log.debug("User logged out")
if config.config_anonbrowse:
location = get_redirect_location(request.args.get('next', None), "web.login")
else:
location = None
if location:
return redirect(location)
else:
return redirect(url_for('web.login'))
# ################################### Users own configuration #########################################################
def change_profile(kobo_support, hardcover_support, local_oauth_check, oauth_status, translations, languages):
to_save = request.form.to_dict()
current_user.random_books = 0
try:
if current_user.role_passwd() or current_user.role_admin():
if to_save.get("password", "") != "":
current_user.password = generate_password_hash(valid_password(to_save.get("password")))
if to_save.get("kindle_mail", current_user.kindle_mail) != current_user.kindle_mail:
current_user.kindle_mail = valid_email(to_save.get("kindle_mail"))
if to_save.get("kindle_mail_subject", current_user.kindle_mail_subject) != current_user.kindle_mail_subject:
current_user.kindle_mail_subject = strip_whitespaces(to_save.get("kindle_mail_subject", "")) or ""
new_email = valid_email(to_save.get("email", current_user.email))
if not new_email:
raise Exception(_("Email can't be empty and has to be a valid Email"))
if new_email != current_user.email:
current_user.email = check_email(new_email)
if current_user.role_admin():
if to_save.get("name", current_user.name) != current_user.name:
# Query username, if not existing, change
current_user.name = check_username(to_save.get("name"))
current_user.random_books = 1 if to_save.get("show_random") == "on" else 0
current_user.default_language = to_save.get("default_language", "all")
current_user.locale = to_save.get("locale", "en")
old_state = current_user.kobo_only_shelves_sync
# 1 -> 0: nothing has to be done
# 0 -> 1: all synced books have to be added to archived books, + currently synced shelfs which
# don't have to be synced have to be removed (added to Shelf archive)
current_user.kobo_only_shelves_sync = int(to_save.get("kobo_only_shelves_sync") == "on") or 0
if old_state == 0 and current_user.kobo_only_shelves_sync == 1:
kobo_sync_status.update_on_sync_shelfs(current_user.id)
current_user.hardcover_token = to_save.get("hardcover_token","" ).replace("Bearer ","" ) or None
# Auto-send and metadata fetch settings
current_user.auto_send_enabled = to_save.get("auto_send_enabled") == "on"
current_user.auto_metadata_fetch = to_save.get("auto_metadata_fetch") == "on"
# Handle hidden magic shelf templates and custom shelves
from . import magic_shelf
if not current_user.is_anonymous:
# Get all system template keys
all_template_keys = set(magic_shelf.SYSTEM_SHELF_TEMPLATES.keys())
# Get currently hidden items for this user
current_hidden = ub.session.query(ub.HiddenMagicShelfTemplate).filter(
ub.HiddenMagicShelfTemplate.user_id == current_user.id
).all()
current_hidden_template_keys = {h.template_key for h in current_hidden if h.template_key}
current_hidden_shelf_ids = {h.shelf_id for h in current_hidden if h.shelf_id}
# Handle system templates
visible_template_keys = {key for key in all_template_keys if to_save.get(f"show_magic_shelf_{key}") == "on"}
should_be_hidden_templates = all_template_keys - visible_template_keys
# Add newly hidden templates
for key in should_be_hidden_templates:
if key not in current_hidden_template_keys:
new_hidden = ub.HiddenMagicShelfTemplate(
user_id=current_user.id,
template_key=key
)
ub.session.add(new_hidden)
log.info(f"User {current_user.id} hid system shelf template '{key}'")
# Remove templates that should no longer be hidden
for hidden in current_hidden:
if hidden.template_key and hidden.template_key in visible_template_keys:
ub.session.delete(hidden)
log.info(f"User {current_user.id} unhid system shelf template '{hidden.template_key}'")
# Handle custom public shelves - get all available ones
all_public_shelves = ub.session.query(ub.MagicShelf).filter(
ub.MagicShelf.is_public == 1,
ub.MagicShelf.user_id != current_user.id,
ub.MagicShelf.is_system == False
).all()
# Check which ones should be visible (checked)
visible_shelf_ids = {s.id for s in all_public_shelves if to_save.get(f"show_custom_shelf_{s.id}") == "on"}
# Hide shelves that are unchecked but not currently hidden
for shelf in all_public_shelves:
if shelf.id not in visible_shelf_ids and shelf.id not in current_hidden_shelf_ids:
new_hidden = ub.HiddenMagicShelfTemplate(
user_id=current_user.id,
shelf_id=shelf.id
)
ub.session.add(new_hidden)
log.info(f"User {current_user.id} hid custom shelf {shelf.id}")
# Unhide shelves that are checked but currently hidden
for hidden in current_hidden:
if hidden.shelf_id and hidden.shelf_id in visible_shelf_ids:
ub.session.delete(hidden)
log.info(f"User {current_user.id} unhid custom shelf {hidden.shelf_id}")
# Theme change
if 'theme' in to_save:
try:
new_theme = int(to_save.get('theme'))
if new_theme in (0,1):
current_user.theme = new_theme
except Exception:
pass
# OPDS root order
opds_order_raw = to_save.get("opds_root_order", "").strip()
if opds_order_raw:
from .opds import normalize_opds_root_order
opds_order_list = [item.strip() for item in opds_order_raw.split(',') if item.strip()]
normalized_order = normalize_opds_root_order(opds_order_list)
if current_user.view_settings is None:
current_user.view_settings = {}
current_user.view_settings.setdefault('opds', {})['root_order'] = normalized_order
flag_modified(current_user, "view_settings")
else:
if current_user.view_settings and current_user.view_settings.get('opds', {}).get('root_order'):
current_user.view_settings['opds'].pop('root_order', None)
if not current_user.view_settings['opds']:
current_user.view_settings.pop('opds', None)
flag_modified(current_user, "view_settings")
# OPDS hidden entries
opds_hidden_raw = to_save.get("opds_hidden_entries", "").strip()
if opds_hidden_raw:
from .opds import OPDS_ROOT_ENTRY_DEFS
hidden_entries = [item.strip() for item in opds_hidden_raw.split(',') if item.strip()]
hidden_entries = [key for key in hidden_entries if key in OPDS_ROOT_ENTRY_DEFS]
if current_user.view_settings is None:
current_user.view_settings = {}
current_user.view_settings.setdefault('opds', {})['hidden_entries'] = hidden_entries
flag_modified(current_user, "view_settings")
else:
if current_user.view_settings and current_user.view_settings.get('opds', {}).get('hidden_entries'):
current_user.view_settings['opds'].pop('hidden_entries', None)
if not current_user.view_settings['opds']:
current_user.view_settings.pop('opds', None)
flag_modified(current_user, "view_settings")
except Exception as ex:
flash(str(ex), category="error")
return render_title_template("user_edit.html",
content=current_user,
config=config,
translations=translations,
profile=1,
languages=languages,
title=_(f"{current_user.name.capitalize()}'s Profile", name=current_user.name),
page="me",
kobo_support=kobo_support,
hardcover_support=hardcover_support,
registered_oauth=local_oauth_check,
oauth_status=oauth_status)
val = 0
for key, __ in to_save.items():
if key.startswith('show') and not key.startswith('show_magic_shelf_') and not key.startswith('show_custom_shelf_'):
try:
val += int(key[5:])
except (ValueError, IndexError) as e:
log.warning(f"Skipping invalid sidebar checkbox key: {key}")
continue
current_user.sidebar_view = val
if to_save.get("Show_detail_random"):
current_user.sidebar_view += constants.DETAIL_RANDOM
try:
ub.session.commit()
flash(_("Success! Profile Updated"), category="success")
log.debug("Profile updated")
# Redirect to refresh sidebar with updated shelf visibility
return redirect(url_for('web.profile'))
except IntegrityError:
ub.session.rollback()
flash(_("Oops! An account already exists for this Email."), category="error")
log.debug("Found an existing account for this Email")
except OperationalError as e:
ub.session.rollback()
log.error("Database error: %s", e)
flash(_("Oops! Database Error: %(error)s.", error=e), category="error")
@web.route("/me", methods=["GET", "POST"])
@user_login_required
def profile():
languages = calibre_db.speaking_language()
translations = get_available_locale()
kobo_support = feature_support['kobo'] and config.config_kobo_sync
hardcover_support = feature_support['hardcover']
if feature_support['oauth'] and config.config_login_type == 2:
oauth_status = get_oauth_status()
local_oauth_check = oauth_bb.oauth_check
else:
oauth_status = None
local_oauth_check = {}
if request.method == "POST":
return change_profile(kobo_support, hardcover_support, local_oauth_check, oauth_status, translations, languages)
# Query magic shelf data after POST to get updated values
from . import magic_shelf
system_shelf_templates = magic_shelf.SYSTEM_SHELF_TEMPLATES
hidden_items = ub.session.query(
ub.HiddenMagicShelfTemplate.template_key,
ub.HiddenMagicShelfTemplate.shelf_id
).filter(
ub.HiddenMagicShelfTemplate.user_id == current_user.id
).all()
hidden_shelf_templates = {item.template_key for item in hidden_items if item.template_key}
hidden_custom_shelf_ids = {item.shelf_id for item in hidden_items if item.shelf_id}
# Get ALL public custom shelves that user doesn't own (both hidden and visible)
all_public_shelves = ub.session.query(ub.MagicShelf).filter(
ub.MagicShelf.is_public == 1,
ub.MagicShelf.user_id != current_user.id,
ub.MagicShelf.is_system == False
).all()
# Separate into hidden and visible
hidden_custom_shelves = [s for s in all_public_shelves if s.id in hidden_custom_shelf_ids]
visible_public_shelves = [s for s in all_public_shelves if s.id not in hidden_custom_shelf_ids]
from .opds import get_opds_root_order_for_user, get_opds_hidden_entries_for_user, OPDS_ROOT_ENTRY_DEFS, OPDS_ROOT_ORDER_DEFAULT
opds_root_order = get_opds_root_order_for_user(current_user)
opds_root_order_string = ",".join(opds_root_order)
opds_hidden_entries = list(get_opds_hidden_entries_for_user(current_user))
opds_hidden_entries_string = ",".join(opds_hidden_entries)
opds_root_labels = [
{
"key": key,
"label": _(OPDS_ROOT_ENTRY_DEFS[key]['title'])
}
for key in OPDS_ROOT_ORDER_DEFAULT
if key in OPDS_ROOT_ENTRY_DEFS
]
return render_title_template("user_edit.html",
translations=translations,
profile=1,
languages=languages,
content=current_user,
config=config,
kobo_support=kobo_support,
hardcover_support=hardcover_support,
system_shelf_templates=system_shelf_templates,
hidden_shelf_templates=hidden_shelf_templates,
hidden_custom_shelf_ids=hidden_custom_shelf_ids,
hidden_custom_shelves=hidden_custom_shelves,
visible_public_shelves=visible_public_shelves,
opds_root_order_string=opds_root_order_string,
opds_hidden_entries_string=opds_hidden_entries_string,
opds_root_labels=opds_root_labels,
title=_(f"{current_user.name.capitalize()}'s Profile", name=current_user.name),
page="me",
registered_oauth=local_oauth_check,
oauth_status=oauth_status)
# ###################################Show single book ##################################################################
@web.route("/read/<int:book_id>/<book_format>")
@login_required_if_no_ano
@viewer_required
def read_book(book_id, book_format):
book = calibre_db.get_filtered_book(book_id)
if not book:
flash(_("Oops! Selected book is unavailable. File does not exist or is not accessible"),
category="error")
log.debug("Selected book is unavailable. File does not exist or is not accessible")
return redirect(url_for("web.index"))
book.ordered_authors = calibre_db.order_authors([book], False)
# check if book has a bookmark
bookmark = None
if current_user.is_authenticated:
bookmark = ub.session.query(ub.Bookmark).filter(and_(ub.Bookmark.user_id == int(current_user.id),
ub.Bookmark.book_id == book_id,
ub.Bookmark.format == book_format.upper())).first()
# Track read activity
if current_user.is_authenticated:
try:
from scripts.cwa_db import CWA_DB
import json
# Detect source of book discovery
source = request.args.get('from', 'direct')
referer = request.headers.get('Referer', '')
if not source or source == 'direct':
if '/search' in referer:
source = 'search'
elif '/series' in referer:
source = 'series'
elif '/author' in referer:
source = 'author'
elif '/category' in referer:
source = 'category'
elif '/shelf' in referer:
source = 'shelf'
cwa_db = CWA_DB()
cwa_db.log_activity(
user_id=int(current_user.id),
user_name=current_user.name,
event_type='READ',
item_id=book_id,
item_title=book.title,
extra_data=json.dumps({'format': book_format.upper(), 'source': source})
)
except Exception as e:
log.debug(f"Failed to log read activity: {e}")
if book_format.lower() == "epub":
log.debug("Start epub reader for %d", book_id)
return render_title_template('read.html', bookid=book_id, title=book.title, bookmark=bookmark)
elif book_format.lower() == "pdf":
log.debug("Start pdf reader for %d", book_id)
return render_title_template('readpdf.html', pdffile=book_id, title=book.title)
elif book_format.lower() == "txt":
log.debug("Start txt reader for %d", book_id)
return render_title_template('readtxt.html', txtfile=book_id, title=book.title)
elif book_format.lower() in ["djvu", "djv"]:
log.debug("Start djvu reader for %d", book_id)
return render_title_template('readdjvu.html', djvufile=book_id, title=book.title,
extension=book_format.lower())
else:
for fileExt in constants.EXTENSIONS_AUDIO:
if book_format.lower() == fileExt:
entries = calibre_db.get_filtered_book(book_id)
log.debug("Start mp3 listening for %d", book_id)
return render_title_template('listenmp3.html', mp3file=book_id, audioformat=book_format.lower(),
entry=entries, bookmark=bookmark)
for fileExt in ["cbr", "cbt", "cbz"]:
if book_format.lower() == fileExt:
all_name = str(book_id)
title = book.title
if len(book.series):
title = title + " - " + book.series[0].name
if book.series_index:
title = title + " #" + '{0:.2f}'.format(book.series_index).rstrip('0').rstrip('.')
log.debug("Start comic reader for %d", book_id)
return render_title_template('readcbr.html', comicfile=all_name, title=title,
extension=fileExt, bookmark=bookmark)
log.debug("Selected book is unavailable. File does not exist or is not accessible")
flash(_("Oops! Selected book is unavailable. File does not exist or is not accessible"),
category="error")
return redirect(url_for("web.index"))
@web.route("/book/<int:book_id>")
@login_required_if_no_ano
def show_book(book_id):
# Ensure book_id is a plain int to avoid SQLite binding errors
try:
book_id = int(book_id)
except (ValueError, TypeError):
log.error(f"Invalid book_id passed to show_book: {book_id}")
flash(_("Invalid book ID."), category="error")
return redirect(url_for("web.index"))
entries = calibre_db.get_book_read_archived(book_id, config.config_read_column, allow_show_archived=True)
if entries:
read_book = entries[1]
archived_book = entries[2]
entry = entries[0]
entry.read_status = read_book == ub.ReadBook.STATUS_FINISHED
entry.is_archived = archived_book
for lang_index in range(0, len(entry.languages)):
entry.languages[lang_index].language_name = isoLanguages.get_language_name(get_locale(), entry.languages[
lang_index].lang_code)
cc = calibre_db.get_cc_columns(config, filter_config_custom_read=True)
book_in_shelves = []
shelves = ub.session.query(ub.BookShelf).filter(ub.BookShelf.book_id == book_id).all()
for sh in shelves:
book_in_shelves.append(sh.shelf)
entry.tags = sort(entry.tags, key=lambda tag: tag.name)
# Filter tags based on user's allowed/denied tags (Issue #906)
if current_user.is_authenticated:
allowed_tags = current_user.list_allowed_tags()
denied_tags = current_user.list_denied_tags()
# If allowed tags are configured (not empty), filter to only show allowed tags
if allowed_tags and allowed_tags != ['']:
entry.tags = [tag for tag in entry.tags if tag.name in allowed_tags]
# Remove denied tags
if denied_tags and denied_tags != ['']:
entry.tags = [tag for tag in entry.tags if tag.name not in denied_tags]
entry.ordered_authors = calibre_db.order_authors([entry])
entry.email_share_list = check_send_to_ereader(entry)
entry.reader_list = check_read_formats(entry)
entry.audio_entries = []
for media_format in entry.data:
if media_format.format.lower() in constants.EXTENSIONS_AUDIO:
entry.audio_entries.append(media_format.format.lower())
cwa_db = CWA_DB()
cwa_settings = cwa_db.cwa_settings
return render_title_template('detail.html',
entry=entry,
cc=cc,
is_xhr=request.headers.get('X-Requested-With') == 'XMLHttpRequest',
title=entry.title,
books_shelfs=book_in_shelves,
cwa_settings=cwa_settings,
page="book")
else:
log.debug("Selected book is unavailable. File does not exist or is not accessible")
flash(_("Oops! Selected book is unavailable. File does not exist or is not accessible"),
category="error")
return redirect(url_for("web.index"))