fix(gateway): restore Telegram DM topic thread_id after session split (#27166)

When context compression triggers a mid-turn session split, source.thread_id
can be None on synthetic/recovered events. _thread_metadata_for_source then
returns None, causing the Telegram adapter to send with no message_thread_id
and the response lands in the General thread instead of the active DM topic.

Fix:
- hermes_state.py: Add get_telegram_topic_binding_by_session() for reverse
  lookup by session_id (enabled by the existing UNIQUE INDEX on session_id).
- gateway/run.py: After session-split detection, if source is a Telegram DM
  and source.thread_id is None, recover it from the binding via the new
  method so _thread_metadata_for_source produces the correct thread routing.
- tests/: Coverage for the new lookup method and the recovery flow.
This commit is contained in:
JackJin
2026-05-17 11:08:26 +08:00
committed by Teknium
parent 5734c3fb10
commit 95a0955e19
3 changed files with 149 additions and 0 deletions
+31
View File
@@ -16544,6 +16544,37 @@ class GatewayRunner:
entry.session_id = agent.session_id
self.session_store._save()
# If this is a Telegram DM and source.thread_id was lost during
# the session split (synthetic / recovered event), restore it
# from the binding so _thread_metadata_for_source produces the
# correct message_thread_id instead of routing to the General
# thread. Failure here is non-fatal — we log and continue;
# worst case the message lands in General, which is the
# pre-fix behaviour.
if (
getattr(source, "platform", None) == Platform.TELEGRAM
and getattr(source, "chat_type", None) == "dm"
and getattr(source, "thread_id", None) is None
and self._session_db is not None
):
try:
_binding = self._session_db.get_telegram_topic_binding_by_session(
session_id=agent.session_id,
)
if _binding and _binding.get("thread_id"):
source.thread_id = str(_binding["thread_id"])
logger.debug(
"Restored source.thread_id=%s from binding after session split %s%s",
source.thread_id,
session_id,
agent.session_id,
)
except Exception:
logger.debug(
"Failed to restore thread_id from binding after session split",
exc_info=True,
)
effective_session_id = getattr(agent, 'session_id', session_id) if agent else session_id
# When compression created a new session, the messages list was
+24
View File
@@ -2852,6 +2852,30 @@ class SessionDB:
return []
return [dict(row) for row in rows]
def get_telegram_topic_binding_by_session(
self,
*,
session_id: str,
) -> Optional[Dict[str, Any]]:
"""Return the Telegram DM topic binding for a given session_id, if present.
Uses the UNIQUE INDEX on telegram_dm_topic_bindings(session_id) for an
efficient reverse lookup. Returns None when the session has no binding or
the table does not exist yet.
"""
with self._lock:
try:
row = self._conn.execute(
"""
SELECT * FROM telegram_dm_topic_bindings
WHERE session_id = ?
""",
(str(session_id),),
).fetchone()
except sqlite3.OperationalError:
return None
return dict(row) if row else None
def bind_telegram_topic(
self,
*,
+94
View File
@@ -1229,6 +1229,100 @@ def test_list_telegram_topic_bindings_for_chat_no_table(tmp_path):
assert tables == set()
# ---------------------------------------------------------------------------
# Tests for get_telegram_topic_binding_by_session (issue #27166)
# ---------------------------------------------------------------------------
def test_get_telegram_topic_binding_by_session_returns_binding(tmp_path):
"""Reverse lookup by session_id returns the binding row."""
db = SessionDB(db_path=tmp_path / "state.db")
db.enable_telegram_topic_mode(chat_id="208214988", user_id="208214988")
db.create_session(session_id="sess-27166", source="telegram", user_id="208214988")
db.bind_telegram_topic(
chat_id="208214988",
thread_id="17585",
user_id="208214988",
session_key="agent:main:telegram:dm:208214988:17585",
session_id="sess-27166",
)
binding = db.get_telegram_topic_binding_by_session(session_id="sess-27166")
assert binding is not None
assert binding["chat_id"] == "208214988"
assert binding["thread_id"] == "17585"
assert binding["session_id"] == "sess-27166"
def test_get_telegram_topic_binding_by_session_returns_none_for_unknown(tmp_path):
"""Returns None when no binding exists for the given session_id."""
db = SessionDB(db_path=tmp_path / "state.db")
db.apply_telegram_topic_migration()
result = db.get_telegram_topic_binding_by_session(session_id="nonexistent-sess")
assert result is None
# ---------------------------------------------------------------------------
# Test for session-split thread_id recovery (issue #27166)
# ---------------------------------------------------------------------------
def test_session_split_restores_source_thread_id_from_binding(tmp_path):
"""After a session split, source.thread_id is restored from the binding.
Simulates the case where context compression creates a new session_id and
source.thread_id is None (synthetic/recovered event). The recovery block
must look up the binding by the new session_id and restore thread_id on
source so that _thread_metadata_for_source returns the correct thread.
"""
from gateway.run import GatewayRunner
from gateway.config import Platform
db = SessionDB(db_path=tmp_path / "state.db")
db.enable_telegram_topic_mode(chat_id="208214988", user_id="208214988")
db.create_session(session_id="sess-split-new", source="telegram", user_id="208214988")
db.bind_telegram_topic(
chat_id="208214988",
thread_id="17585",
user_id="208214988",
session_key="agent:main:telegram:dm:208214988:17585",
session_id="sess-split-new",
)
runner = object.__new__(GatewayRunner)
runner._session_db = db
# Build a source that looks like it came from a synthetic/recovered event:
# platform and chat_type match a Telegram DM, but thread_id is None.
source = _make_source(thread_id=None)
assert source.platform == Platform.TELEGRAM
assert source.chat_type == "dm"
assert source.thread_id is None
# Simulate the session-split recovery block logic directly.
if (
getattr(source, "platform", None) == Platform.TELEGRAM
and getattr(source, "chat_type", None) == "dm"
and getattr(source, "thread_id", None) is None
and runner._session_db is not None
):
try:
_binding = runner._session_db.get_telegram_topic_binding_by_session(
session_id="sess-split-new",
)
if _binding and _binding.get("thread_id"):
source.thread_id = str(_binding["thread_id"])
except Exception:
pass
assert source.thread_id == "17585", (
"thread_id must be restored from the binding after session split"
)
# Confirm _thread_metadata_for_source now returns non-None.
runner.config = _make_runner(session_db=db).config
runner.adapters = _make_runner(session_db=db).adapters
meta = GatewayRunner._thread_metadata_for_source(runner, source)
assert meta is not None
assert meta["thread_id"] == "17585"