Files
crocodilestick 98643dd7c7 Fix Calibre 9 schema crash (books.isbn/flags) with safe ISBN fallback
Calibre 9 removed books.isbn, books.flags, and books.lccn from metadata.db. CWA’s ORM still mapped these fields, so SQLAlchemy emitted SELECTs for missing columns and the UI crashed with 500s (issues #954/#956/#958/#967/#979). This change removes the obsolete column mappings, reads ISBNs from the identifiers table, and adds a backward‑compatible fallback to books.isbn when it exists.

I also detect the presence of books.isbn via PRAGMA calibre.table_info(books) at startup to avoid repeated lookup failures. This restores Calibre 9 compatibility while keeping older libraries working.

[bug]
Fixes #979
2026-01-31 13:53:25 +01:00

1337 lines
57 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
# Calibre-Web Automated fork of Calibre-Web
# Copyright (C) 2018-2025 Calibre-Web contributors
# Copyright (C) 2024-2025 Calibre-Web Automated contributors
# SPDX-License-Identifier: GPL-3.0-or-later
# See CONTRIBUTORS for full list of authors.
import os
import re
import json
import time
import threading
from datetime import datetime, timezone
from urllib.parse import quote
import unidecode
from weakref import WeakSet
from uuid import uuid4
from sqlite3 import OperationalError as sqliteOperationalError
import sqlite3
from sqlalchemy import create_engine
from sqlalchemy import Table, Column, ForeignKey, CheckConstraint
from sqlalchemy import String, Integer, Boolean, TIMESTAMP, Float
from sqlalchemy.orm import relationship, sessionmaker, scoped_session, joinedload, object_session
from sqlalchemy.orm.collections import InstrumentedList
from sqlalchemy.ext.declarative import DeclarativeMeta
from sqlalchemy.exc import OperationalError
try:
# Compatibility with sqlalchemy 2.0
from sqlalchemy.orm import declarative_base
except ImportError:
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.pool import StaticPool
from sqlalchemy.sql.expression import and_, true, false, text, func, or_
from sqlalchemy.ext.associationproxy import association_proxy
from .cw_login import current_user
from flask_babel import gettext as _
from flask_babel import get_locale
from flask import flash
from . import logger, ub, isoLanguages
from .pagination import Pagination
from .string_helper import strip_whitespaces
log = logger.create()
cc_exceptions = ['composite', 'series']
cc_classes = {}
Base = declarative_base()
books_authors_link = Table('books_authors_link', Base.metadata,
Column('book', Integer, ForeignKey('books.id'), primary_key=True),
Column('author', Integer, ForeignKey('authors.id'), primary_key=True)
)
books_tags_link = Table('books_tags_link', Base.metadata,
Column('book', Integer, ForeignKey('books.id'), primary_key=True),
Column('tag', Integer, ForeignKey('tags.id'), primary_key=True)
)
books_series_link = Table('books_series_link', Base.metadata,
Column('book', Integer, ForeignKey('books.id'), primary_key=True),
Column('series', Integer, ForeignKey('series.id'), primary_key=True)
)
books_ratings_link = Table('books_ratings_link', Base.metadata,
Column('book', Integer, ForeignKey('books.id'), primary_key=True),
Column('rating', Integer, ForeignKey('ratings.id'), primary_key=True)
)
books_languages_link = Table('books_languages_link', Base.metadata,
Column('book', Integer, ForeignKey('books.id'), primary_key=True),
Column('lang_code', Integer, ForeignKey('languages.id'), primary_key=True)
)
books_publishers_link = Table('books_publishers_link', Base.metadata,
Column('book', Integer, ForeignKey('books.id'), primary_key=True),
Column('publisher', Integer, ForeignKey('publishers.id'), primary_key=True)
)
class Library_Id(Base):
__tablename__ = 'library_id'
id = Column(Integer, primary_key=True)
uuid = Column(String, nullable=False)
class Identifiers(Base):
__tablename__ = 'identifiers'
id = Column(Integer, primary_key=True)
type = Column(String(collation='NOCASE'), nullable=False, default="isbn")
val = Column(String(collation='NOCASE'), nullable=False)
book = Column(Integer, ForeignKey('books.id'), nullable=False)
def __init__(self, val, id_type, book):
super().__init__()
self.val = val
self.type = id_type
self.book = book
def format_type(self):
format_type = self.type.lower()
if format_type == 'amazon':
return "Amazon"
elif format_type.startswith("amazon_"):
return "Amazon.{0}".format(format_type[7:].lower().replace("uk","co.uk"))
elif format_type == "isbn":
return "ISBN"
elif format_type == "doi":
return "DOI"
elif format_type == "douban":
return "Douban"
elif format_type == "goodreads":
return "Goodreads"
elif format_type == "babelio":
return "Babelio"
elif format_type == "google":
return "Google Books"
elif format_type == "kobo":
return "Kobo"
elif format_type == "barnesnoble":
return "Barnes & Noble"
elif format_type == "litres":
return "ЛитРес"
elif format_type == "issn":
return "ISSN"
elif format_type == "isfdb":
return "ISFDB"
elif format_type == "lubimyczytac":
return "Lubimyczytac"
elif format_type == "databazeknih":
return "Databáze knih"
elif format_type == "hardcover-slug":
return "Hardcover"
elif format_type == "storygraph":
return "StoryGraph"
elif format_type == "smashwords":
return "Smashwords"
elif format_type == "ebooks":
return "Ebooks.com"
else:
return self.type
def __repr__(self):
format_type = self.type.lower()
if format_type == "amazon" or format_type == "asin":
return "https://amazon.com/dp/{0}".format(self.val)
elif format_type.startswith('amazon_'):
return "https://amazon.{0}/dp/{1}".format(format_type[7:].lower().replace("uk","co.uk"), self.val)
elif format_type == "isbn":
return "https://www.worldcat.org/isbn/{0}".format(self.val)
elif format_type == "doi":
return "https://dx.doi.org/{0}".format(self.val)
elif format_type == "goodreads":
return "https://www.goodreads.com/book/show/{0}".format(self.val)
elif format_type == "babelio":
return "https://www.babelio.com/livres/titre/{0}".format(self.val)
elif format_type == "douban":
return "https://book.douban.com/subject/{0}".format(self.val)
elif format_type == "google":
return "https://books.google.com/books?id={0}".format(self.val)
elif format_type == "kobo":
return "https://www.kobo.com/ebook/{0}".format(self.val)
elif format_type == "barnesnoble":
return "https://www.barnesandnoble.com/w/{0}".format(self.val)
elif format_type == "lubimyczytac":
return "https://lubimyczytac.pl/ksiazka/{0}/ksiazka".format(self.val)
elif format_type == "litres":
return "https://www.litres.ru/{0}".format(self.val)
elif format_type == "issn":
return "https://portal.issn.org/resource/ISSN/{0}".format(self.val)
elif format_type == "isfdb":
return "https://www.isfdb.org/cgi-bin/pl.cgi?{0}".format(self.val)
elif format_type == "databazeknih":
return "https://www.databazeknih.cz/knihy/{0}".format(self.val)
elif format_type == "hardcover-slug":
return "https://hardcover.app/books/{0}".format(self.val)
elif format_type == "ibdb":
return "https://ibdb.dev/book/{0}".format(self.val)
elif format_type == "storygraph":
return "https://app.thestorygraph.com/books/{0}".format(self.val)
elif format_type == "smashwords":
return "https://www.smashwords.com/books/view/{0}".format(self.val)
elif format_type == "ebooks":
return "https://www.ebooks.com/en-{0}".format(self.val)
elif self.val.lower().startswith("javascript:"):
return quote(self.val)
elif self.val.lower().startswith("data:"):
link, __, __ = str.partition(self.val, ",")
return link
else:
return "{0}".format(self.val)
class Comments(Base):
__tablename__ = 'comments'
id = Column(Integer, primary_key=True)
book = Column(Integer, ForeignKey('books.id'), nullable=False, unique=True)
text = Column(String(collation='NOCASE'), nullable=False)
def __init__(self, comment, book):
super().__init__()
self.text = comment
self.book = book
def get(self):
return self.text
def __repr__(self):
return "<Comments({0})>".format(self.text)
class Tags(Base):
__tablename__ = 'tags'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(collation='NOCASE'), unique=True, nullable=False)
def __init__(self, name):
super().__init__()
self.name = name
def get(self):
return self.name
def __eq__(self, other):
return self.name == other
def __repr__(self):
return "<Tags('{0})>".format(self.name)
class Authors(Base):
__tablename__ = 'authors'
id = Column(Integer, primary_key=True)
name = Column(String(collation='NOCASE'), unique=True, nullable=False)
sort = Column(String(collation='NOCASE'))
link = Column(String, nullable=False, default="")
def __init__(self, name, sort, link=""):
super().__init__()
self.name = name
self.sort = sort
self.link = link
def get(self):
return self.name
def __eq__(self, other):
return self.name == other
def __repr__(self):
return "<Authors('{0},{1}{2}')>".format(self.name, self.sort, self.link)
class Series(Base):
__tablename__ = 'series'
id = Column(Integer, primary_key=True)
name = Column(String(collation='NOCASE'), unique=True, nullable=False)
sort = Column(String(collation='NOCASE'))
def __init__(self, name, sort):
super().__init__()
self.name = name
self.sort = sort
def get(self):
return self.name
def __eq__(self, other):
return self.name == other
def __repr__(self):
return "<Series('{0},{1}')>".format(self.name, self.sort)
class Ratings(Base):
__tablename__ = 'ratings'
id = Column(Integer, primary_key=True)
rating = Column(Integer, CheckConstraint('rating>-1 AND rating<11'), unique=True)
def __init__(self, rating):
super().__init__()
self.rating = rating
def get(self):
return self.rating
def __eq__(self, other):
return self.rating == other
def __repr__(self):
return "<Ratings('{0}')>".format(self.rating)
class Languages(Base):
__tablename__ = 'languages'
id = Column(Integer, primary_key=True)
lang_code = Column(String(collation='NOCASE'), nullable=False, unique=True)
def __init__(self, lang_code):
super().__init__()
self.lang_code = lang_code
def get(self):
if hasattr(self, "language_name"):
return self.language_name
else:
return self.lang_code
def __eq__(self, other):
return self.lang_code == other
def __repr__(self):
return "<Languages('{0}')>".format(self.lang_code)
class Publishers(Base):
__tablename__ = 'publishers'
id = Column(Integer, primary_key=True)
name = Column(String(collation='NOCASE'), nullable=False, unique=True)
sort = Column(String(collation='NOCASE'))
def __init__(self, name, sort):
super().__init__()
self.name = name
self.sort = sort
def get(self):
return self.name
def __eq__(self, other):
return self.name == other
def __repr__(self):
return "<Publishers('{0},{1}')>".format(self.name, self.sort)
class Data(Base):
__tablename__ = 'data'
__table_args__ = {'schema': 'calibre'}
id = Column(Integer, primary_key=True)
book = Column(Integer, ForeignKey('books.id'), nullable=False)
format = Column(String(collation='NOCASE'), nullable=False)
uncompressed_size = Column(Integer, nullable=False)
name = Column(String, nullable=False)
def __init__(self, book, book_format, uncompressed_size, name):
super().__init__()
self.book = book
self.format = book_format
self.uncompressed_size = uncompressed_size
self.name = name
# ToDo: Check
def get(self):
return self.name
def __repr__(self):
return "<Data('{0},{1}{2}{3}')>".format(self.book, self.format, self.uncompressed_size, self.name)
class Metadata_Dirtied(Base):
__tablename__ = 'metadata_dirtied'
id = Column(Integer, primary_key=True, autoincrement=True)
book = Column(Integer, ForeignKey('books.id'), nullable=False, unique=True)
def __init__(self, book):
super().__init__()
self.book = book
# Import BookFormatChecksum from progress_syncing.models to keep model definition centralized
# Import directly from models module to avoid triggering progress_syncing/__init__.py chain
# which would cause circular import (models needs Base from db, but progress_syncing imports kosync)
from .progress_syncing import models as _progress_models # noqa: E402
BookFormatChecksum = _progress_models.BookFormatChecksum
class Books(Base):
__tablename__ = 'books'
DEFAULT_PUBDATE = datetime(101, 1, 1, 0, 0, 0, 0) # ("0101-01-01 00:00:00+00:00")
_has_isbn_column = None
id = Column(Integer, primary_key=True, autoincrement=True)
title = Column(String(collation='NOCASE'), nullable=False, default='Unknown')
sort = Column(String(collation='NOCASE'))
author_sort = Column(String(collation='NOCASE'))
timestamp = Column(TIMESTAMP, default=lambda: datetime.now(timezone.utc))
pubdate = Column(TIMESTAMP, default=DEFAULT_PUBDATE)
series_index = Column(String, nullable=False, default="1.0")
last_modified = Column(TIMESTAMP, default=lambda: datetime.now(timezone.utc))
path = Column(String, default="", nullable=False)
has_cover = Column(Integer, default=0)
uuid = Column(String)
authors = relationship(Authors, secondary=books_authors_link, backref='books')
tags = relationship(Tags, secondary=books_tags_link, backref='books', order_by="Tags.name")
comments = relationship(Comments, backref='books')
data = relationship(Data, backref='books')
series = relationship(Series, secondary=books_series_link, backref='books')
ratings = relationship(Ratings, secondary=books_ratings_link, backref='books')
languages = relationship(Languages, secondary=books_languages_link, backref='books')
publishers = relationship(Publishers, secondary=books_publishers_link, backref='books')
identifiers = relationship(Identifiers, backref='books')
def __init__(self, title, sort, author_sort, timestamp, pubdate, series_index, last_modified, path, has_cover,
authors, tags, languages=None):
super().__init__()
self.title = title
self.sort = sort
self.author_sort = author_sort
self.timestamp = timestamp
self.pubdate = pubdate
self.series_index = series_index
self.last_modified = last_modified
self.path = path
self.has_cover = (has_cover is not None)
def __repr__(self):
return "<Books('{0},{1}{2}{3}{4}{5}{6}{7}{8}')>".format(self.title, self.sort, self.author_sort,
self.timestamp, self.pubdate, self.series_index,
self.last_modified, self.path, self.has_cover)
@property
def atom_timestamp(self):
return self.timestamp.strftime('%Y-%m-%dT%H:%M:%S+00:00') or ''
@property
def isbn(self):
for identifier in self.identifiers:
if identifier.type and identifier.type.lower() == "isbn":
return identifier.val or ""
if self._has_isbn_column:
session = object_session(self)
if session is not None:
try:
value = session.execute(text("SELECT isbn FROM books WHERE id = :id"),
{"id": self.id}).scalar()
return value or ""
except Exception:
Books._has_isbn_column = False
return ""
return ""
class CustomColumns(Base):
__tablename__ = 'custom_columns'
id = Column(Integer, primary_key=True)
label = Column(String)
name = Column(String)
datatype = Column(String)
mark_for_delete = Column(Boolean)
editable = Column(Boolean)
display = Column(String)
is_multiple = Column(Boolean)
normalized = Column(Boolean)
def get_display_dict(self):
display_dict = json.loads(self.display)
return display_dict
def to_json(self, value, extra, sequence):
content = dict()
content['table'] = "custom_column_" + str(self.id)
content['column'] = "value"
content['datatype'] = self.datatype
content['is_multiple'] = None if not self.is_multiple else "|"
content['kind'] = "field"
content['name'] = self.name
content['search_terms'] = ['#' + self.label]
content['label'] = self.label
content['colnum'] = self.id
content['display'] = self.get_display_dict()
content['is_custom'] = True
content['is_category'] = self.datatype in ['text', 'rating', 'enumeration', 'series']
content['link_column'] = "value"
content['category_sort'] = "value"
content['is_csp'] = False
content['is_editable'] = self.editable
content['rec_index'] = sequence + 22 # toDo why ??
if isinstance(value, datetime):
content['#value#'] = {"__class__": "datetime.datetime",
"__value__": value.strftime("%Y-%m-%dT%H:%M:%S+00:00")}
else:
content['#value#'] = value
content['#extra#'] = extra
content['is_multiple2'] = {} if not self.is_multiple else {"cache_to_list": "|", "ui_to_list": ",",
"list_to_ui": ", "}
return json.dumps(content, ensure_ascii=False)
class AlchemyEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o.__class__, DeclarativeMeta):
# an SQLAlchemy class
fields = {}
for field in [x for x in dir(o) if not x.startswith('_') and x != 'metadata' and x != "password"]:
if field == 'books':
continue
data = o.__getattribute__(field)
try:
if isinstance(data, str):
data = data.replace("'", "\'")
elif isinstance(data, InstrumentedList):
el = list()
# ele = None
for ele in data:
if hasattr(ele, 'value'): # converter for custom_column values
el.append(str(ele.value))
elif ele.get:
el.append(ele.get())
else:
el.append(json.dumps(ele, cls=AlchemyEncoder))
if field == 'authors':
data = " & ".join(el)
else:
data = ",".join(el)
if data == '[]':
data = ""
else:
json.dumps(data)
fields[field] = data
except Exception:
fields[field] = ""
# a json-encodable dict
return fields
return json.JSONEncoder.default(self, o)
class CalibreDB:
_init = False
engine = None
config = None
session_factory = None
# This is a WeakSet so that references here don't keep other CalibreDB
# instances alive once they reach the end of their respective scopes
instances = WeakSet()
_reconnect_lock = threading.RLock() # Reentrant lock to prevent concurrent reconnect operations
def __init__(self, expire_on_commit=True, init=False):
""" Initialize a new CalibreDB session
"""
self.session = None
if init:
self.init_db(expire_on_commit)
def init_db(self, expire_on_commit=True):
if self._init:
self.init_session(expire_on_commit)
self.instances.add(self)
def init_session(self, expire_on_commit=True):
if self.session_factory is None:
log.error("Cannot init session: session_factory is None")
return
self.session = self.session_factory()
self.session.expire_on_commit = expire_on_commit
self.create_functions(self.config)
def ensure_session(self, expire_on_commit=True):
"""Ensure a valid SQLAlchemy session exists.
This protects against brief windows where dispose() nulled the session during a reconnect.
Holds lock during entire recreation to prevent race conditions.
"""
if self.session is not None:
return # Fast path - session already exists
# Session is None - need to recreate it
# Acquire lock to ensure atomic recreation (no interruption by dispose)
with self._reconnect_lock:
# Double-check after acquiring lock (another thread may have recreated it)
if self.session is not None:
return
# Try to recreate session from factory
if self.session_factory is not None:
try:
self.init_session(expire_on_commit)
return # Success
except Exception as ex:
log.error(f"Failed to init session from factory: {ex}")
# Factory is None or init failed - try to rebuild entire database setup
if self.config and getattr(self.config, 'config_calibre_dir', None):
try:
log.warning("Session factory unavailable, attempting to rebuild database setup")
# Note: setup_db will call dispose() which is safe because we hold _reconnect_lock (RLock is reentrant)
self.setup_db(self.config.config_calibre_dir, ub.app_DB_path)
# After setup_db, session_factory should exist, try to init session
if self.session is None and self.session_factory is not None:
self.init_session(expire_on_commit)
if self.session is not None:
return
except Exception as ex:
log.error(f"Failed to rebuild database setup in ensure_session: {ex}")
# Fallback: attempt to initialize from app.db if config is missing or setup failed
if self.session is None and ub.app_DB_path:
try:
log.warning("Session still unavailable; attempting init from app.db")
from .calibre_init import init_calibre_db_from_app_db
if init_calibre_db_from_app_db(ub.app_DB_path):
self.init_session(expire_on_commit)
if self.session is not None:
return
except Exception as ex:
log.error(f"Failed to init session from app.db in ensure_session: {ex}")
# If we still don't have a session, log warning
# Don't raise exception - let caller handle AttributeError if they try to use None session
if self.session is None:
log.error("ensure_session: Unable to create session - session factory and config unavailable")
@classmethod
def setup_db_cc_classes(cls, cc):
cc_ids = []
books_custom_column_links = {}
for row in cc:
if row.datatype not in cc_exceptions:
if row.datatype == 'series':
dicttable = {'__tablename__': 'books_custom_column_' + str(row.id) + '_link',
'id': Column(Integer, primary_key=True),
'book': Column(Integer, ForeignKey('books.id'),
primary_key=True),
'map_value': Column('value', Integer,
ForeignKey('custom_column_' +
str(row.id) + '.id'),
primary_key=True),
'extra': Column(Float),
'asoc': relationship('custom_column_' + str(row.id), uselist=False),
'value': association_proxy('asoc', 'value')
}
books_custom_column_links[row.id] = type(str('books_custom_column_' + str(row.id) + '_link'),
(Base,), dicttable)
if row.datatype in ['rating', 'text', 'enumeration']:
books_custom_column_links[row.id] = Table('books_custom_column_' + str(row.id) + '_link',
Base.metadata,
Column('book', Integer, ForeignKey('books.id'),
primary_key=True),
Column('value', Integer,
ForeignKey('custom_column_' +
str(row.id) + '.id'),
primary_key=True)
)
cc_ids.append([row.id, row.datatype])
ccdict = {'__tablename__': 'custom_column_' + str(row.id),
'id': Column(Integer, primary_key=True)}
if row.datatype == 'float':
ccdict['value'] = Column(Float)
elif row.datatype == 'int':
ccdict['value'] = Column(Integer)
elif row.datatype == 'datetime':
ccdict['value'] = Column(TIMESTAMP)
elif row.datatype == 'bool':
ccdict['value'] = Column(Boolean)
else:
ccdict['value'] = Column(String)
if row.datatype in ['float', 'int', 'bool', 'datetime', 'comments']:
ccdict['book'] = Column(Integer, ForeignKey('books.id'))
cc_classes[row.id] = type(str('custom_column_' + str(row.id)), (Base,), ccdict)
for cc_id in cc_ids:
if cc_id[1] in ['bool', 'int', 'float', 'datetime', 'comments']:
setattr(Books,
'custom_column_' + str(cc_id[0]),
relationship(cc_classes[cc_id[0]],
primaryjoin=(
Books.id == cc_classes[cc_id[0]].book),
backref='books'))
elif cc_id[1] == 'series':
setattr(Books,
'custom_column_' + str(cc_id[0]),
relationship(books_custom_column_links[cc_id[0]],
backref='books'))
else:
setattr(Books,
'custom_column_' + str(cc_id[0]),
relationship(cc_classes[cc_id[0]],
secondary=books_custom_column_links[cc_id[0]],
backref='books'))
return cc_classes
@classmethod
def check_valid_db(cls, config_calibre_dir, app_db_path, config_calibre_uuid):
if not config_calibre_dir:
return False, False
dbpath = os.path.join(config_calibre_dir, "metadata.db")
if not os.path.exists(dbpath):
return False, False
try:
check_engine = create_engine('sqlite://',
echo=False,
isolation_level="SERIALIZABLE",
connect_args={'check_same_thread': False, 'timeout': 30},
poolclass=StaticPool)
with check_engine.begin() as connection:
connection.execute(text("attach database '{}' as calibre;".format(dbpath)))
connection.execute(text("attach database '{}' as app_settings;".format(app_db_path)))
# Try enabling WAL to improve concurrency unless running on a network share
# Controlled by env var NETWORK_SHARE_MODE (default False)
try:
nsm = os.getenv('NETWORK_SHARE_MODE', 'False').lower() in ('1', 'true', 'yes', 'on')
if not nsm:
connection.execute(text("PRAGMA calibre.journal_mode=WAL"))
connection.execute(text("PRAGMA app_settings.journal_mode=WAL"))
except Exception:
pass
local_session = scoped_session(sessionmaker())
local_session.configure(bind=connection)
database_uuid = local_session().query(Library_Id).one_or_none()
# local_session.dispose()
check_engine.connect()
db_change = config_calibre_uuid != database_uuid.uuid
except Exception:
return False, False
return True, db_change
@classmethod
def update_config(cls, config):
cls.config = config
@classmethod
def setup_db(cls, config_calibre_dir, app_db_path):
# Wrap entire method in lock to ensure atomic setup operation
# RLock is reentrant, so nested calls (e.g., from reconnect_db) are safe
with cls._reconnect_lock:
# Always call dispose to clean up old sessions/connections
cls.dispose()
if not config_calibre_dir:
log.error("setup_db failed: config_calibre_dir is None or empty")
if cls.config:
cls.config.invalidate()
return None
dbpath = os.path.join(config_calibre_dir, "metadata.db")
if not os.path.exists(dbpath):
log.error(f"setup_db failed: metadata.db not found at {dbpath}")
if cls.config:
cls.config.invalidate()
return None
try:
cls.engine = create_engine('sqlite://',
echo=False,
isolation_level="SERIALIZABLE",
connect_args={'check_same_thread': False, 'timeout': 30},
poolclass=StaticPool)
with cls.engine.begin() as connection:
connection.execute(text("attach database '{}' as calibre;".format(dbpath)))
connection.execute(text("attach database '{}' as app_settings;".format(app_db_path)))
# Try enabling WAL to improve concurrency unless running on a network share
# Controlled by env var NETWORK_SHARE_MODE (default False)
try:
nsm = os.getenv('NETWORK_SHARE_MODE', 'False').lower() in ('1', 'true', 'yes', 'on')
if not nsm:
connection.execute(text("PRAGMA calibre.journal_mode=WAL"))
connection.execute(text("PRAGMA app_settings.journal_mode=WAL"))
except Exception:
pass
conn = cls.engine.connect()
# conn.text_factory = lambda b: b.decode(errors = 'ignore') possible fix for #1302
except Exception as ex:
log.error(f"setup_db failed during engine creation: {ex}")
if cls.config:
cls.config.invalidate(ex)
return None
if cls.config:
cls.config.db_configured = True
if not cc_classes:
try:
cc = conn.execute(text("SELECT id, datatype FROM custom_columns"))
cls.setup_db_cc_classes(cc)
except OperationalError as e:
log.error_or_exception(e)
return None
try:
cols = conn.execute(text("PRAGMA calibre.table_info(books)")).fetchall()
Books._has_isbn_column = any(row[1] == "isbn" for row in cols)
except Exception:
Books._has_isbn_column = False
cls.session_factory = scoped_session(sessionmaker(autocommit=False,
autoflush=True,
bind=cls.engine, future=True))
for inst in cls.instances:
inst.init_session()
# Ensure progress syncing tables exist in metadata.db (book checksums)
from .progress_syncing.models import ensure_calibre_db_tables
ensure_calibre_db_tables(conn)
cls._init = True
# End of with cls._reconnect_lock
def get_book(self, book_id):
self.ensure_session()
return self.session.query(Books).filter(Books.id == book_id).first()
def get_filtered_book(self, book_id, allow_show_archived=False):
self.ensure_session()
# Eagerly load all relationships to prevent detached instance errors during editing
return (self.session.query(Books)
.options(joinedload(Books.authors),
joinedload(Books.tags),
joinedload(Books.comments),
joinedload(Books.data),
joinedload(Books.series),
joinedload(Books.ratings),
joinedload(Books.languages),
joinedload(Books.publishers),
joinedload(Books.identifiers))
.filter(Books.id == book_id)
.filter(self.common_filters(allow_show_archived))
.first())
def get_book_read_archived(self, book_id, read_column, allow_show_archived=False):
self.ensure_session()
if not read_column:
bd = (self.session.query(Books, ub.ReadBook.read_status, ub.ArchivedBook.is_archived).select_from(Books)
.join(ub.ReadBook, and_(ub.ReadBook.user_id == int(current_user.id), ub.ReadBook.book_id == book_id),
isouter=True))
else:
try:
read_column = cc_classes[read_column]
bd = (self.session.query(Books, read_column.value, ub.ArchivedBook.is_archived).select_from(Books)
.join(read_column, read_column.book == book_id,
isouter=True))
except (KeyError, AttributeError, IndexError):
log.error("Custom Column No.{} does not exist in calibre database".format(read_column))
# Skip linking read column and return None instead of read status
bd = self.session.query(Books, None, ub.ArchivedBook.is_archived)
# Eagerly load the data relationship to prevent session errors
bd = bd.options(joinedload(Books.data))
return (bd.filter(Books.id == book_id)
.join(ub.ArchivedBook, and_(Books.id == ub.ArchivedBook.book_id,
int(current_user.id) == ub.ArchivedBook.user_id), isouter=True)
.filter(self.common_filters(allow_show_archived)).first())
def get_book_by_uuid(self, book_uuid):
self.ensure_session()
return self.session.query(Books).filter(Books.uuid == book_uuid).first()
def get_book_format(self, book_id, file_format):
self.ensure_session()
return self.session.query(Data).filter(Data.book == book_id).filter(Data.format == file_format).first()
def get_author_by_name(self, name):
self.ensure_session()
return self.session.query(Authors).filter(Authors.name == name).first()
def get_tag_by_name(self, name):
self.ensure_session()
return self.session.query(Tags).filter(Tags.name == name).first()
def get_series_by_name(self, name):
self.ensure_session()
return self.session.query(Series).filter(Series.name == name).first()
def get_publisher_by_name(self, name):
self.ensure_session()
return self.session.query(Publishers).filter(Publishers.name == name).first()
def set_metadata_dirty(self, book_id):
self.ensure_session()
if not self.session.query(Metadata_Dirtied).filter(Metadata_Dirtied.book == book_id).one_or_none():
self.session.add(Metadata_Dirtied(book_id))
def delete_dirty_metadata(self, book_id):
self.ensure_session()
try:
self.session.query(Metadata_Dirtied).filter(Metadata_Dirtied.book == book_id).delete()
self.session.commit()
except (OperationalError) as e:
self.session.rollback()
log.error("Database error: {}".format(e))
# Language and content filters for displaying in the UI
def common_filters(self, allow_show_archived=False, return_all_languages=False, viewing_tag_id=None):
if not allow_show_archived:
archived_books = (ub.session.query(ub.ArchivedBook)
.filter(ub.ArchivedBook.user_id==int(current_user.id))
.filter(ub.ArchivedBook.is_archived==True)
.all())
archived_book_ids = [archived_book.book_id for archived_book in archived_books]
archived_filter = Books.id.notin_(archived_book_ids)
else:
archived_filter = true()
if current_user.filter_language() == "all" or return_all_languages:
lang_filter = true()
else:
lang_filter = Books.languages.any(Languages.lang_code == current_user.filter_language())
negtags_list = current_user.list_denied_tags()
postags_list = current_user.list_allowed_tags()
neg_content_tags_filter = false() if negtags_list == [''] else Books.tags.any(Tags.name.in_(negtags_list))
# Issue #906: When viewing a specific tag category, include that tag in allowed tags
if viewing_tag_id is not None and postags_list != ['']:
# Get the tag name for the viewing_tag_id
viewing_tag = self.session.query(Tags).filter(Tags.id == viewing_tag_id).first()
if viewing_tag and viewing_tag.name not in postags_list:
# Temporarily add the viewed tag to the allowed list for this query
postags_list = postags_list + [viewing_tag.name]
pos_content_tags_filter = true() if postags_list == [''] else Books.tags.any(Tags.name.in_(postags_list))
if self.config.config_restricted_column:
try:
pos_cc_list = current_user.allowed_column_value.split(',')
pos_content_cc_filter = true() if pos_cc_list == [''] else \
getattr(Books, 'custom_column_' + str(self.config.config_restricted_column)). \
any(cc_classes[self.config.config_restricted_column].value.in_(pos_cc_list))
neg_cc_list = current_user.denied_column_value.split(',')
neg_content_cc_filter = false() if neg_cc_list == [''] else \
getattr(Books, 'custom_column_' + str(self.config.config_restricted_column)). \
any(cc_classes[self.config.config_restricted_column].value.in_(neg_cc_list))
except (KeyError, AttributeError, IndexError):
pos_content_cc_filter = false()
neg_content_cc_filter = true()
log.error("Custom Column No.{} does not exist in calibre database".format(
self.config.config_restricted_column))
flash(_("Custom Column No.%(column)d does not exist in calibre database",
column=self.config.config_restricted_column),
category="error")
else:
pos_content_cc_filter = true()
neg_content_cc_filter = false()
return and_(lang_filter, pos_content_tags_filter, ~neg_content_tags_filter,
pos_content_cc_filter, ~neg_content_cc_filter, archived_filter)
def generate_linked_query(self, config_read_column, database):
# Safety: session can be briefly None during DB reconnects
self.ensure_session()
if not config_read_column:
query = (self.session.query(database, ub.ArchivedBook.is_archived, ub.ReadBook.read_status)
.select_from(Books)
.outerjoin(ub.ReadBook,
and_(ub.ReadBook.user_id == int(current_user.id), ub.ReadBook.book_id == Books.id)))
else:
try:
read_column = cc_classes[config_read_column]
query = (self.session.query(database, ub.ArchivedBook.is_archived, read_column.value)
.select_from(Books)
.outerjoin(read_column, read_column.book == Books.id))
except (KeyError, AttributeError, IndexError):
log.error("Custom Column No.{} does not exist in calibre database".format(config_read_column))
# Skip linking read column and return None instead of read status
query = self.session.query(database, None, ub.ArchivedBook.is_archived)
return query.outerjoin(ub.ArchivedBook, and_(Books.id == ub.ArchivedBook.book_id,
int(current_user.id) == ub.ArchivedBook.user_id))
@staticmethod
def get_checkbox_sorted(inputlist, state, offset, limit, order, combo=False):
outcome = list()
if combo:
elementlist = {ele[0].id: ele for ele in inputlist}
else:
elementlist = {ele.id: ele for ele in inputlist}
for entry in state:
try:
outcome.append(elementlist[entry])
except KeyError:
pass
del elementlist[entry]
for entry in elementlist:
outcome.append(elementlist[entry])
if order == "asc":
outcome.reverse()
return outcome[offset:offset + limit]
# Fill indexpage with all requested data from database
def fill_indexpage(self, page, pagesize, database, db_filter, order,
join_archive_read=False, config_read_column=0, *join, **kwargs):
self.ensure_session()
return self.fill_indexpage_with_archived_books(page, database, pagesize, db_filter, order, False,
join_archive_read, config_read_column, *join, **kwargs)
def fill_indexpage_with_archived_books(self, page, database, pagesize, db_filter, order, allow_show_archived,
join_archive_read, config_read_column, *join, **kwargs):
self.ensure_session()
viewing_tag_id = kwargs.get('viewing_tag_id')
pagesize = pagesize or self.config.config_books_per_page
if current_user.show_detail_random():
random_query = self.generate_linked_query(config_read_column, database)
# Eagerly load the data relationship for random books to prevent session errors
if database == Books:
random_query = random_query.options(joinedload(Books.data))
randm = (random_query.filter(self.common_filters(allow_show_archived, viewing_tag_id=viewing_tag_id))
.order_by(func.random())
.limit(self.config.config_random_books).all())
else:
randm = false()
if join_archive_read:
query = self.generate_linked_query(config_read_column, database)
else:
query = self.session.query(database)
# Eagerly load the data relationship to prevent DetachedInstanceError in templates
if database == Books:
query = query.options(joinedload(Books.data))
off = int(int(pagesize) * (page - 1))
indx = len(join)
element = 0
while indx:
if indx >= 3:
query = query.outerjoin(join[element], join[element+1]).outerjoin(join[element+2])
indx -= 3
element += 3
elif indx == 2:
query = query.outerjoin(join[element], join[element+1])
indx -= 2
element += 2
elif indx == 1:
query = query.outerjoin(join[element])
indx -= 1
element += 1
query = query.filter(db_filter)\
.filter(self.common_filters(allow_show_archived, viewing_tag_id=viewing_tag_id))
entries = list()
pagination = list()
try:
if database == Books:
total_count = query.with_entities(Books.id).distinct().count()
else:
total_count = query.count()
pagination = Pagination(page, pagesize, total_count)
entries = query.order_by(*order).offset(off).limit(pagesize).all()
except Exception as ex:
log.error_or_exception(ex)
# display authors in right order
entries = self.order_authors(entries, True, join_archive_read)
return entries, randm, pagination
# Orders all Authors in the list according to authors sort
def order_authors(self, entries, list_return=False, combined=False):
self.ensure_session()
for entry in entries:
if combined:
sort_authors = entry.Books.author_sort.split('&')
ids = [a.id for a in entry.Books.authors]
else:
sort_authors = entry.author_sort.split('&')
ids = [a.id for a in entry.authors]
authors_ordered = list()
# error = False
for auth in sort_authors:
auth = strip_whitespaces(auth)
# Skip empty author strings to prevent spurious errors
if not auth:
continue
results = self.session.query(Authors).filter(Authors.sort == auth).all()
# ToDo: How to handle not found author name
if not len(results):
log.error("Author '{}' not found to display name in right order".format(auth))
# error = True
break
for r in results:
if r.id in ids:
authors_ordered.append(r)
ids.remove(r.id)
for author_id in ids:
result = self.session.query(Authors).filter(Authors.id == author_id).first()
authors_ordered.append(result)
if list_return:
if combined:
entry.Books.authors = authors_ordered
else:
entry.ordered_authors = authors_ordered
else:
return authors_ordered
return entries
def get_typeahead(self, database, query, replace=('', ''), tag_filter=true()):
self.ensure_session()
query = query or ''
self.create_functions()
# self.session.connection().connection.connection.create_function("lower", 1, lcase)
entries = self.session.query(database).filter(tag_filter). \
filter(func.lower(database.name).ilike("%" + query + "%")).all()
# json_dumps = json.dumps([dict(name=escape(r.name.replace(*replace))) for r in entries])
json_dumps = json.dumps([dict(name=r.name.replace(*replace)) for r in entries])
return json_dumps
def check_exists_book(self, authr, title):
self.ensure_session()
self.create_functions()
# self.session.connection().connection.connection.create_function("lower", 1, lcase)
q = list()
author_terms = re.split(r'\s*&\s*', authr)
for author_term in author_terms:
q.append(Books.authors.any(func.lower(Authors.name).ilike("%" + author_term + "%")))
return self.session.query(Books) \
.filter(and_(Books.authors.any(and_(*q)), func.lower(Books.title).ilike("%" + title + "%"))).first()
def search_query(self, term, config, *join):
self.ensure_session()
strip_whitespaces(term).lower()
self.create_functions()
# self.session.connection().connection.connection.create_function("lower", 1, lcase)
q = list()
author_terms = re.split("[, ]+", term)
for author_term in author_terms:
q.append(Books.authors.any(func.lower(Authors.name).ilike("%" + author_term + "%")))
query = self.generate_linked_query(config.config_read_column, Books)
if len(join) == 6:
query = query.outerjoin(join[0], join[1]).outerjoin(join[2]).outerjoin(join[3], join[4]).outerjoin(join[5])
if len(join) == 3:
query = query.outerjoin(join[0], join[1]).outerjoin(join[2])
elif len(join) == 2:
query = query.outerjoin(join[0], join[1])
elif len(join) == 1:
query = query.outerjoin(join[0])
cc = self.get_cc_columns(config, filter_config_custom_read=True)
filter_expression = [Books.tags.any(func.lower(Tags.name).ilike("%" + term + "%")),
Books.series.any(func.lower(Series.name).ilike("%" + term + "%")),
Books.authors.any(and_(*q)),
Books.publishers.any(func.lower(Publishers.name).ilike("%" + term + "%")),
func.lower(Books.title).ilike("%" + term + "%")]
for c in cc:
if c.datatype not in ["datetime", "rating", "bool", "int", "float"]:
filter_expression.append(
getattr(Books,
'custom_column_' + str(c.id)).any(
func.lower(cc_classes[c.id].value).ilike("%" + term + "%")))
# Eagerly load the data relationship to prevent session errors
query = query.options(joinedload(Books.data))
return query.filter(self.common_filters(True)).filter(or_(*filter_expression))
def get_cc_columns(self, config, filter_config_custom_read=False):
self.ensure_session()
tmp_cc = self.session.query(CustomColumns).filter(CustomColumns.datatype.notin_(cc_exceptions)).all()
cc = []
r = None
if config.config_columns_to_ignore:
r = re.compile(config.config_columns_to_ignore)
for col in tmp_cc:
if filter_config_custom_read and config.config_read_column and config.config_read_column == col.id:
continue
if r and r.match(col.name):
continue
cc.append(col)
return cc
# read search results from calibre-database and return it (function is used for feed and simple search
def get_search_results(self, term, config, offset=None, order=None, limit=None, *join):
self.ensure_session()
order = order[0] if order else [Books.sort]
pagination = None
result = self.search_query(term, config, *join).order_by(*order).all()
result_count = len(result)
if offset is not None and limit is not None:
offset = int(offset)
limit_all = offset + int(limit)
pagination = Pagination((offset / (int(limit)) + 1), limit, result_count)
else:
offset = 0
limit_all = result_count
ub.store_combo_ids(result)
entries = self.order_authors(result[offset:limit_all], list_return=True, combined=True)
return entries, result_count, pagination
# Creates for all stored languages a translated speaking name in the array for the UI
def speaking_language(self, languages=None, return_all_languages=False, with_count=False, reverse_order=False):
self.ensure_session()
if with_count:
if not languages:
languages = self.session.query(Languages, func.count('books_languages_link.book'))\
.join(books_languages_link).join(Books)\
.filter(self.common_filters(return_all_languages=return_all_languages)) \
.group_by(text('books_languages_link.lang_code')).all()
tags = list()
for lang in languages:
tag = Category(isoLanguages.get_language_name(get_locale(), lang[0].lang_code), lang[0].lang_code)
tags.append([tag, lang[1]])
# Append all books without language to list
if not return_all_languages:
no_lang_count = (self.session.query(Books)
.outerjoin(books_languages_link).outerjoin(Languages)
.filter(Languages.lang_code==None)
.filter(self.common_filters())
.count())
if no_lang_count:
tags.append([Category(_("None"), "none"), no_lang_count])
return sorted(tags, key=lambda x: x[0].name.lower(), reverse=reverse_order)
else:
if not languages:
languages = self.session.query(Languages) \
.join(books_languages_link) \
.join(Books) \
.filter(self.common_filters(return_all_languages=return_all_languages)) \
.group_by(text('books_languages_link.lang_code')).all()
for lang in languages:
lang.name = isoLanguages.get_language_name(get_locale(), lang.lang_code)
return sorted(languages, key=lambda x: x.name, reverse=reverse_order)
def create_functions(self, config=None):
self.ensure_session()
if self.session is None:
log.error("create_functions: Cannot create functions because session is None")
return
# user defined sort function for calibre databases (Series, etc.)
if config:
def _title_sort(title):
# calibre sort stuff
title_pat = re.compile(config.config_title_regex, re.IGNORECASE)
match = title_pat.search(title)
if match:
prep = match.group(1)
title = title[len(prep):] + ', ' + prep
return strip_whitespaces(title)
try:
# sqlalchemy <1.4.24 and sqlalchemy 2.0
conn = self.session.connection().connection.driver_connection
except AttributeError:
# sqlalchemy >1.4.24
conn = self.session.connection().connection.connection
try:
if config:
conn.create_function("title_sort", 1, _title_sort)
conn.create_function('uuid4', 0, lambda: str(uuid4()))
conn.create_function("lower", 1, lcase)
except sqliteOperationalError:
pass
@classmethod
def dispose(cls):
# global session
# Use lock to prevent concurrent dispose/reconnect operations
with cls._reconnect_lock:
for inst in cls.instances:
old_session = inst.session
inst.session = None
if old_session:
try:
old_session.close()
except Exception:
pass
if old_session.bind:
try:
old_session.bind.dispose()
except Exception:
pass
for attr in list(Books.__dict__.keys()):
if attr.startswith("custom_column_"):
setattr(Books, attr, None)
for db_class in cc_classes.values():
Base.metadata.remove(db_class.__table__)
cc_classes.clear()
for table in reversed(Base.metadata.sorted_tables):
name = table.key
if name.startswith("custom_column_") or name.startswith("books_custom_column_"):
if table is not None:
Base.metadata.remove(table)
def reconnect_db(self, config, app_db_path):
# Use lock to ensure atomic reconnect operation
with self._reconnect_lock:
# Be resilient if database wasn't initialized yet
try:
self.dispose()
except Exception:
# Ignore dispose errors during reconnect
pass
# engine is a class-level attribute that may be None before first setup
try:
if getattr(self, 'engine', None) is not None:
self.engine.dispose()
except Exception:
# Ignore engine dispose errors; we'll rebuild below
pass
# Rebuild engine/session factory and update config
self.setup_db(config.config_calibre_dir, app_db_path)
self.update_config(config)
def lcase(s):
try:
return unidecode.unidecode(s.lower())
except Exception as ex:
_log = logger.create()
_log.error_or_exception(ex)
return s.lower()
class Category:
name = None
id = None
count = None
rating = None
def __init__(self, name, cat_id, rating=None):
self.name = name
self.id = cat_id
self.rating = rating
self.count = 1