fix(gateway): quiet corrupt kanban dispatcher boards

Salvages substantive part of #26490 by @aqilaziz. Detects corrupt board
DBs ("file is not a database" / "database disk image is malformed")
and disables them by fingerprint until they're repaired, instead of
flooding the gateway log with repeated logger.exception tracebacks every
tick.

Cherry-picked the substantive commit (ea5b4ec2a); the tip commit was
an unrelated _is_dir OSError fix for service-path lookup. Dropped a
small test reformat that was bundled in the same commit.
This commit is contained in:
aqilaziz
2026-05-18 21:05:13 -07:00
committed by Teknium
parent 78da7efa20
commit d37574775b
2 changed files with 124 additions and 0 deletions
+48
View File
@@ -37,6 +37,7 @@ import signal
import tempfile
import threading
import time
import sqlite3
from collections import OrderedDict
from contextvars import copy_context
from pathlib import Path
@@ -4830,6 +4831,28 @@ class GatewayRunner:
HEALTH_WINDOW = 6
bad_ticks = 0
last_warn_at = 0
disabled_corrupt_boards: dict[str, tuple[str, int | None, int | None]] = {}
def _board_db_fingerprint(slug: str) -> tuple[str, int | None, int | None]:
path = _kb.kanban_db_path(slug)
try:
resolved = str(path.expanduser().resolve())
except Exception:
resolved = str(path)
try:
stat = path.stat()
except OSError:
return (resolved, None, None)
return (resolved, stat.st_mtime_ns, stat.st_size)
def _is_corrupt_board_db_error(exc: Exception) -> bool:
if not isinstance(exc, sqlite3.DatabaseError):
return False
msg = str(exc).lower()
return (
"file is not a database" in msg
or "database disk image is malformed" in msg
)
def _tick_once_for_board(slug: str) -> "Optional[object]":
"""Run one dispatch_once for a specific board.
@@ -4841,6 +4864,16 @@ class GatewayRunner:
connection handle or accidentally claim across each other.
"""
conn = None
fingerprint = _board_db_fingerprint(slug)
disabled_fingerprint = disabled_corrupt_boards.get(slug)
if disabled_fingerprint == fingerprint:
return None
if disabled_fingerprint is not None:
logger.info(
"kanban dispatcher: board %s database changed; retrying dispatch",
slug,
)
disabled_corrupt_boards.pop(slug, None)
try:
conn = _kb.connect(board=slug)
# `connect()` runs the schema + idempotent migration on
@@ -4856,6 +4889,21 @@ class GatewayRunner:
max_in_progress=max_in_progress,
failure_limit=failure_limit,
)
except sqlite3.DatabaseError as exc:
if _is_corrupt_board_db_error(exc):
disabled_corrupt_boards[slug] = fingerprint
logger.error(
"kanban dispatcher: board %s database %s is not a valid "
"SQLite database; disabling dispatch for this board "
"until the file changes or the gateway restarts. Move "
"or restore the file, then run `hermes kanban init` if "
"you need a fresh board.",
slug,
fingerprint[0],
)
return None
logger.exception("kanban dispatcher: tick failed on board %s", slug)
return None
except Exception:
logger.exception("kanban dispatcher: tick failed on board %s", slug)
return None
@@ -3594,6 +3594,82 @@ def test_gateway_dispatcher_watcher_env_truthy_uses_config(monkeypatch):
)
def test_gateway_dispatcher_disables_corrupt_board_without_traceback(
monkeypatch, tmp_path, caplog
):
"""Corrupt board DBs log one actionable error and stop retrying per tick."""
import asyncio
import logging
import sqlite3
from gateway.run import GatewayRunner
import hermes_cli.config as _cfg_mod
import hermes_cli.kanban_db as _kb
runner = object.__new__(GatewayRunner)
runner._running = True
corrupt_db = tmp_path / "kanban.db"
corrupt_db.write_text("not sqlite", encoding="utf-8")
monkeypatch.setattr(
_cfg_mod,
"load_config",
lambda: {
"kanban": {
"dispatch_in_gateway": True,
"dispatch_interval_seconds": 1,
}
},
)
monkeypatch.setattr(
_kb,
"list_boards",
lambda include_archived=False: [{"slug": _kb.DEFAULT_BOARD}],
)
monkeypatch.setattr(
_kb,
"read_board_metadata",
lambda slug: {"slug": slug},
)
monkeypatch.setattr(_kb, "kanban_db_path", lambda board=None: corrupt_db)
calls = {"connect": 0, "to_thread": 0}
def _connect(*args, **kwargs):
calls["connect"] += 1
raise sqlite3.DatabaseError("file is not a database")
async def _to_thread(fn, *args, **kwargs):
calls["to_thread"] += 1
result = fn(*args, **kwargs)
if calls["to_thread"] >= 4:
runner._running = False
return result
async def _sleep(_delay):
return None
monkeypatch.setattr(_kb, "connect", _connect)
monkeypatch.setattr("gateway.run.asyncio.to_thread", _to_thread)
monkeypatch.setattr("gateway.run.asyncio.sleep", _sleep)
with caplog.at_level(logging.ERROR, logger="gateway.run"):
asyncio.run(
asyncio.wait_for(
runner._kanban_dispatcher_watcher(),
timeout=3.0,
)
)
messages = [record.getMessage() for record in caplog.records]
assert sum("not a valid SQLite database" in msg for msg in messages) == 1
assert not any("tick failed on board" in msg for msg in messages)
assert not any(record.exc_info for record in caplog.records)
# First tick connect + two ready-queue probes. The second dispatch tick
# skips connect because the corrupt board fingerprint is disabled.
assert calls["connect"] == 3
# ---------------------------------------------------------------------------
# Hallucination gate (created_cards verify + prose scan)
# ---------------------------------------------------------------------------