fix: scan stalls on platforms with 10k+ already-scanned ROMs

The scan was spending excessive time on large platforms even when all ROMs
were already scanned. Root causes: per-ROM UPDATE queries for skipped ROMs
(10k individual writes), missing composite index on (platform_id, fs_name)
causing full table scans, NOT IN clauses with 10k+ values in
mark_missing_roms(), and redundant filesystem reads.

Changes:
- Add bulk_mark_present() for batch-updating skipped ROMs in one query
- Move skip detection from _identify_rom to the batch loop so skipped ROMs
  never enter the async scan pipeline, and report progress for them
- Add composite index idx_roms_platform_id_fs_name via migration 0077
- Rewrite mark_missing_roms() with flip-based approach: mark all missing,
  then un-mark present ones in chunks of 1000
- Cache filesystem reads in scan_platforms() to avoid double directory
  traversal (precounting + scanning)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Georges-Antoine Assi
2026-04-11 19:20:40 -04:00
parent 686c609b3a
commit 3991e1b6ed
5 changed files with 421 additions and 43 deletions
@@ -0,0 +1,29 @@
"""Add composite index on (platform_id, fs_name) for roms table
Revision ID: 0077_add_platform_fs_name_index
Revises: 0076_play_sessions
Create Date: 2026-04-11 00:00:00.000000
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "0077_add_platform_fs_name_index"
down_revision = "0076_play_sessions"
branch_labels = None
depends_on = None
def upgrade() -> None:
with op.batch_alter_table("roms", schema=None) as batch_op:
batch_op.create_index(
"idx_roms_platform_id_fs_name",
["platform_id", "fs_name"],
unique=False,
)
def downgrade() -> None:
with op.batch_alter_table("roms", schema=None) as batch_op:
batch_op.drop_index("idx_roms_platform_id_fs_name")
+47 -31
View File
@@ -243,20 +243,6 @@ async def _identify_rom(
if redis_client.get(STOP_SCAN_FLAG):
return
if not _should_scan_rom(
scan_type=scan_type,
rom=rom,
roms_ids=roms_ids,
metadata_sources=metadata_sources,
):
if rom:
# Just to update the filesystem data
db_rom_handler.update_rom(
rom.id, {"fs_name": fs_rom["fs_name"], "missing_from_fs": False}
)
return
# Update properties that don't require metadata
parsed_tags = fs_rom_handler.parse_tags(fs_rom["fs_name"])
roms_path = fs_rom_handler.get_roms_fs_structure(platform.fs_slug)
@@ -465,6 +451,7 @@ async def _identify_platform(
socket_manager: socketio.AsyncRedisManager,
scan_stats: ScanStats,
calculate_hashes: bool = True,
fs_roms_cache: dict[str, list[FSRom]] | None = None,
) -> ScanStats:
# Stop the scan if the flag is set
if redis_client.get(STOP_SCAN_FLAG):
@@ -533,12 +520,15 @@ async def _identify_platform(
new_firmware=new_firmware,
)
# Scanning roms
try:
fs_roms = await fs_rom_handler.get_roms(platform)
except RomsNotFoundException as e:
log.error(e)
return scan_stats
# Scanning roms — use cached filesystem data if available
if fs_roms_cache is not None and platform_slug in fs_roms_cache:
fs_roms = fs_roms_cache[platform_slug]
else:
try:
fs_roms = await fs_rom_handler.get_roms(platform)
except RomsNotFoundException as e:
log.error(e)
return scan_stats
if len(fs_roms) == 0:
log.warning(
@@ -572,19 +562,41 @@ async def _identify_platform(
fs_names={fs_rom["fs_name"] for fs_rom in fs_roms_batch},
)
# Process ROMs concurrently within the batch
scan_tasks = [
scan_rom_with_semaphore(
fs_rom=fs_rom, rom=roms_by_fs_name.get(fs_rom["fs_name"])
# Separate skipped ROMs from those that need scanning
skipped_rom_ids: list[int] = []
roms_to_scan: list[tuple[FSRom, Rom | None]] = []
for fs_rom in fs_roms_batch:
rom = roms_by_fs_name.get(fs_rom["fs_name"])
if _should_scan_rom(
scan_type=scan_type,
rom=rom,
roms_ids=roms_ids,
metadata_sources=metadata_sources,
):
roms_to_scan.append((fs_rom, rom))
elif rom:
skipped_rom_ids.append(rom.id)
# Bulk update all skipped ROMs in one query instead of per-ROM updates
if skipped_rom_ids:
db_rom_handler.bulk_mark_present(platform.id, skipped_rom_ids)
await scan_stats.increment(
socket_manager=socket_manager,
scanned_roms=len(skipped_rom_ids),
)
for fs_rom in fs_roms_batch
# Process only ROMs that actually need scanning
scan_tasks = [
scan_rom_with_semaphore(fs_rom=fs_rom, rom=rom)
for fs_rom, rom in roms_to_scan
]
# Wait for all ROMs in the batch to complete
batched_results = await asyncio.gather(*scan_tasks, return_exceptions=True)
for result, fs_rom in zip(batched_results, fs_roms_batch, strict=False):
if isinstance(result, Exception):
log.error(f"Error scanning ROM {fs_rom['fs_name']}: {result}")
if scan_tasks:
batched_results = await asyncio.gather(*scan_tasks, return_exceptions=True)
for result, (fs_rom, _) in zip(batched_results, roms_to_scan, strict=False):
if isinstance(result, Exception):
log.error(f"Error scanning ROM {fs_rom['fs_name']}: {result}")
missing_roms = db_rom_handler.mark_missing_roms(
platform.id, [rom["fs_name"] for rom in fs_roms]
@@ -645,11 +657,14 @@ async def scan_platforms(
if MetadataSource.HLTB in metadata_sources:
meta_hltb_handler.initialize()
# Precalculate total platforms and ROMs
# Precalculate total platforms and ROMs, caching filesystem reads
# so _identify_platform() doesn't have to re-read the same directories
fs_roms_cache: dict[str, list[FSRom]] = {}
total_roms = 0
for platform_slug in fs_platforms:
try:
fs_roms = await fs_rom_handler.get_roms(Platform(fs_slug=platform_slug))
fs_roms_cache[platform_slug] = fs_roms
total_roms += len(fs_roms)
except RomsNotFoundException as e:
log.error(e)
@@ -694,6 +709,7 @@ async def scan_platforms(
socket_manager=socket_manager,
scan_stats=scan_stats,
calculate_hashes=calculate_hashes,
fs_roms_cache=fs_roms_cache,
)
missed_platforms = db_platform_handler.mark_missing_platforms(fs_platforms)
+52 -12
View File
@@ -922,7 +922,6 @@ class DBRomsHandler(DBBaseHandler):
)
@begin_session
@with_details
def get_roms_by_fs_name(
self,
platform_id: int,
@@ -966,6 +965,31 @@ class DBRomsHandler(DBBaseHandler):
.execution_options(synchronize_session="evaluate")
)
@begin_session
def bulk_mark_present(
self,
platform_id: int,
rom_ids: list[int],
session: Session = None, # type: ignore
) -> None:
"""Bulk set missing_from_fs=False for a list of ROM IDs."""
if not rom_ids:
return
for i in range(0, len(rom_ids), 1000):
chunk = rom_ids[i : i + 1000]
session.execute(
update(Rom)
.where(
and_(
Rom.platform_id == platform_id,
Rom.id.in_(chunk),
)
)
.values(missing_from_fs=False)
.execution_options(synchronize_session="evaluate")
)
@begin_session
def mark_missing_roms(
self,
@@ -973,30 +997,46 @@ class DBRomsHandler(DBBaseHandler):
fs_roms_to_keep: list[str],
session: Session = None, # type: ignore
) -> Sequence[Rom]:
# Mark ALL ROMs for this platform as missing
session.execute(
update(Rom)
.where(Rom.platform_id == platform_id)
.values(missing_from_fs=True)
.execution_options(synchronize_session="evaluate")
)
# Un-mark ROMs that exist on filesystem (in chunks)
for i in range(0, len(fs_roms_to_keep), 1000):
chunk = fs_roms_to_keep[i : i + 1000]
session.execute(
update(Rom)
.where(
and_(
Rom.platform_id == platform_id,
Rom.fs_name.in_(chunk),
)
)
.values(missing_from_fs=False)
.execution_options(synchronize_session="evaluate")
)
# Return the ones that remained marked as missing (for logging)
missing_roms = (
session.scalars(
select(Rom)
.options(load_only(Rom.id, Rom.fs_name))
.order_by(Rom.fs_name.asc())
.where(
and_(
Rom.platform_id == platform_id,
Rom.fs_name.not_in(fs_roms_to_keep),
Rom.missing_from_fs.is_(True),
)
)
)
.unique()
.all()
)
session.execute(
update(Rom)
.where(
and_(
Rom.platform_id == platform_id, Rom.fs_name.not_in(fs_roms_to_keep)
)
)
.values(**{"missing_from_fs": True})
.execution_options(synchronize_session="evaluate")
)
return missing_roms
@begin_session
+1
View File
@@ -173,6 +173,7 @@ class Rom(BaseModel):
Index("idx_roms_flashpoint_id", "flashpoint_id"),
Index("idx_roms_hltb_id", "hltb_id"),
Index("idx_roms_gamelist_id", "gamelist_id"),
Index("idx_roms_platform_id_fs_name", "platform_id", "fs_name"),
)
fs_name: Mapped[str] = mapped_column(String(length=FILE_NAME_MAX_LENGTH))
+292
View File
@@ -340,6 +340,298 @@ def test_article_stripping_sort(platform: Platform):
assert [r.name for r in roms] == ["The Legend", "A Quest", "Zelda"]
def test_bulk_mark_present(platform: Platform):
"""bulk_mark_present sets missing_from_fs=False for the given ROM IDs."""
roms = []
for i in range(5):
rom = db_rom_handler.add_rom(
Rom(
platform_id=platform.id,
name=f"rom_{i}",
slug=f"rom-{i}",
fs_name=f"rom_{i}.zip",
fs_name_no_tags=f"rom_{i}",
fs_name_no_ext=f"rom_{i}",
fs_extension="zip",
fs_path=f"{platform.slug}/roms",
missing_from_fs=True,
)
)
roms.append(rom)
# Mark first 3 as present
db_rom_handler.bulk_mark_present(platform.id, [r.id for r in roms[:3]])
for r in roms[:3]:
updated = db_rom_handler.get_rom(r.id)
assert updated is not None
assert updated.missing_from_fs is False
for r in roms[3:]:
updated = db_rom_handler.get_rom(r.id)
assert updated is not None
assert updated.missing_from_fs is True
def test_bulk_mark_present_empty_list(platform: Platform):
"""bulk_mark_present with an empty list is a no-op."""
rom = db_rom_handler.add_rom(
Rom(
platform_id=platform.id,
name="rom_lonely",
slug="rom-lonely",
fs_name="rom_lonely.zip",
fs_name_no_tags="rom_lonely",
fs_name_no_ext="rom_lonely",
fs_extension="zip",
fs_path=f"{platform.slug}/roms",
missing_from_fs=True,
)
)
db_rom_handler.bulk_mark_present(platform.id, [])
updated = db_rom_handler.get_rom(rom.id)
assert updated is not None
assert updated.missing_from_fs is True
def test_bulk_mark_present_chunking(platform: Platform):
"""bulk_mark_present handles >1000 IDs via internal chunking."""
roms = []
for i in range(1050):
rom = db_rom_handler.add_rom(
Rom(
platform_id=platform.id,
name=f"rom_{i}",
slug=f"rom-{i}",
fs_name=f"rom_{i}.zip",
fs_name_no_tags=f"rom_{i}",
fs_name_no_ext=f"rom_{i}",
fs_extension="zip",
fs_path=f"{platform.slug}/roms",
missing_from_fs=True,
)
)
roms.append(rom)
all_ids = [r.id for r in roms]
db_rom_handler.bulk_mark_present(platform.id, all_ids)
# Spot-check a few across chunk boundaries
for idx in [0, 999, 1000, 1049]:
updated = db_rom_handler.get_rom(roms[idx].id)
assert updated is not None
assert updated.missing_from_fs is False
def test_mark_missing_roms_small_platform(platform: Platform):
"""mark_missing_roms correctly identifies missing ROMs with a small keep list."""
rom_a = db_rom_handler.add_rom(
Rom(
platform_id=platform.id,
name="rom_a",
slug="rom-a",
fs_name="rom_a.zip",
fs_name_no_tags="rom_a",
fs_name_no_ext="rom_a",
fs_extension="zip",
fs_path=f"{platform.slug}/roms",
)
)
rom_b = db_rom_handler.add_rom(
Rom(
platform_id=platform.id,
name="rom_b",
slug="rom-b",
fs_name="rom_b.zip",
fs_name_no_tags="rom_b",
fs_name_no_ext="rom_b",
fs_extension="zip",
fs_path=f"{platform.slug}/roms",
)
)
rom_c = db_rom_handler.add_rom(
Rom(
platform_id=platform.id,
name="rom_c",
slug="rom-c",
fs_name="rom_c.zip",
fs_name_no_tags="rom_c",
fs_name_no_ext="rom_c",
fs_extension="zip",
fs_path=f"{platform.slug}/roms",
)
)
# Keep only rom_a and rom_c
missing = db_rom_handler.mark_missing_roms(platform.id, ["rom_a.zip", "rom_c.zip"])
assert len(missing) == 1
assert missing[0].fs_name == "rom_b.zip"
updated_b = db_rom_handler.get_rom(rom_b.id)
assert updated_b is not None
assert updated_b.missing_from_fs is True
updated_a = db_rom_handler.get_rom(rom_a.id)
assert updated_a is not None
assert updated_a.missing_from_fs is False
def test_mark_missing_roms_large_platform(platform: Platform):
"""mark_missing_roms correctly identifies missing ROMs with a large keep list."""
rom_present = db_rom_handler.add_rom(
Rom(
platform_id=platform.id,
name="rom_present",
slug="rom-present",
fs_name="rom_present.zip",
fs_name_no_tags="rom_present",
fs_name_no_ext="rom_present",
fs_extension="zip",
fs_path=f"{platform.slug}/roms",
)
)
rom_missing = db_rom_handler.add_rom(
Rom(
platform_id=platform.id,
name="rom_missing",
slug="rom-missing",
fs_name="rom_missing.zip",
fs_name_no_tags="rom_missing",
fs_name_no_ext="rom_missing",
fs_extension="zip",
fs_path=f"{platform.slug}/roms",
)
)
# Build a keep list with >500 entries to trigger the flip-based path.
# Only rom_present.zip actually exists in DB; the rest are just filler.
fs_roms_to_keep = ["rom_present.zip"] + [f"filler_{i}.zip" for i in range(501)]
missing = db_rom_handler.mark_missing_roms(platform.id, fs_roms_to_keep)
assert len(missing) == 1
assert missing[0].fs_name == "rom_missing.zip"
updated_present = db_rom_handler.get_rom(rom_present.id)
assert updated_present is not None
assert updated_present.missing_from_fs is False
updated_missing = db_rom_handler.get_rom(rom_missing.id)
assert updated_missing is not None
assert updated_missing.missing_from_fs is True
def test_mark_missing_roms_large_platform_all_present(platform: Platform):
"""When all ROMs are in the keep list, none should be marked missing."""
roms = []
for i in range(3):
rom = db_rom_handler.add_rom(
Rom(
platform_id=platform.id,
name=f"rom_{i}",
slug=f"rom-{i}",
fs_name=f"rom_{i}.zip",
fs_name_no_tags=f"rom_{i}",
fs_name_no_ext=f"rom_{i}",
fs_extension="zip",
fs_path=f"{platform.slug}/roms",
)
)
roms.append(rom)
# Keep list has all real ROMs plus filler to exceed 500
fs_roms_to_keep = [f"rom_{i}.zip" for i in range(3)] + [
f"filler_{i}.zip" for i in range(500)
]
missing = db_rom_handler.mark_missing_roms(platform.id, fs_roms_to_keep)
assert len(missing) == 0
for rom in roms:
updated = db_rom_handler.get_rom(rom.id)
assert updated is not None
assert updated.missing_from_fs is False
def test_mark_missing_roms_large_platform_all_missing(platform: Platform):
"""When no ROMs are in the keep list, all should be marked missing."""
roms = []
for i in range(3):
rom = db_rom_handler.add_rom(
Rom(
platform_id=platform.id,
name=f"rom_{i}",
slug=f"rom-{i}",
fs_name=f"rom_{i}.zip",
fs_name_no_tags=f"rom_{i}",
fs_name_no_ext=f"rom_{i}",
fs_extension="zip",
fs_path=f"{platform.slug}/roms",
)
)
roms.append(rom)
# Keep list has only filler — none of the real ROMs
fs_roms_to_keep = [f"filler_{i}.zip" for i in range(501)]
missing = db_rom_handler.mark_missing_roms(platform.id, fs_roms_to_keep)
assert len(missing) == 3
missing_names = {r.fs_name for r in missing}
assert missing_names == {"rom_0.zip", "rom_1.zip", "rom_2.zip"}
def test_mark_missing_roms_does_not_affect_other_platforms(platform: Platform):
"""mark_missing_roms should only affect ROMs on the target platform."""
other_platform = db_platform_handler.add_platform(
Platform(
name="other_platform",
slug="other_platform_slug",
fs_slug="other_platform_slug",
)
)
rom_on_target = db_rom_handler.add_rom(
Rom(
platform_id=platform.id,
name="target_rom",
slug="target-rom",
fs_name="target_rom.zip",
fs_name_no_tags="target_rom",
fs_name_no_ext="target_rom",
fs_extension="zip",
fs_path=f"{platform.slug}/roms",
)
)
rom_on_other = db_rom_handler.add_rom(
Rom(
platform_id=other_platform.id,
name="other_rom",
slug="other-rom",
fs_name="other_rom.zip",
fs_name_no_tags="other_rom",
fs_name_no_ext="other_rom",
fs_extension="zip",
fs_path=f"{other_platform.slug}/roms",
)
)
# Use flip-based path (>500 items), keeping nothing on target platform
fs_roms_to_keep = [f"filler_{i}.zip" for i in range(501)]
missing = db_rom_handler.mark_missing_roms(platform.id, fs_roms_to_keep)
assert len(missing) == 1
assert missing[0].fs_name == "target_rom.zip"
# Other platform's ROM should be untouched
updated_other = db_rom_handler.get_rom(rom_on_other.id)
assert updated_other is not None
assert updated_other.missing_from_fs is False
def test_users(admin_user):
db_user_handler.add_user(
User(