diff --git a/plugins/web/xai/__init__.py b/plugins/web/xai/__init__.py new file mode 100644 index 0000000000..9ec4a58899 --- /dev/null +++ b/plugins/web/xai/__init__.py @@ -0,0 +1,14 @@ +"""xAI web search plugin — bundled, auto-loaded. + +Mirrors the ``plugins/web/brave_free/`` layout: ``provider.py`` holds the +provider class, ``__init__.py::register(ctx)`` registers an instance. +""" + +from __future__ import annotations + +from plugins.web.xai.provider import XAIWebSearchProvider + + +def register(ctx) -> None: + """Register the xAI Web Search provider with the plugin context.""" + ctx.register_web_search_provider(XAIWebSearchProvider()) diff --git a/plugins/web/xai/plugin.yaml b/plugins/web/xai/plugin.yaml new file mode 100644 index 0000000000..03874fea98 --- /dev/null +++ b/plugins/web/xai/plugin.yaml @@ -0,0 +1,7 @@ +name: web-xai +version: 1.0.0 +description: "xAI Web Search — search the web via Grok's agentic web_search tool (Responses API). Requires xAI Grok OAuth (via `hermes auth`) or XAI_API_KEY (https://x.ai)." +author: NousResearch +kind: backend +provides_web_providers: + - xai diff --git a/plugins/web/xai/provider.py b/plugins/web/xai/provider.py new file mode 100644 index 0000000000..a74b6a683e --- /dev/null +++ b/plugins/web/xai/provider.py @@ -0,0 +1,560 @@ +"""xAI Web Search — plugin form. + +Routes ``web_search`` tool calls through xAI's agentic Web Search tool +(server-side ``web_search`` on the Responses API). Grok runs the actual +searching and page-browsing server-side; we ask it to return the top +results as structured JSON so we can hand back the same +``{title, url, description, position}`` rows every other Hermes web +provider produces. + +Reference: https://docs.x.ai/developers/tools/web-search + +Config keys this provider responds to:: + + web: + search_backend: "xai" # explicit per-capability + backend: "xai" # shared fallback + +Optional knobs (under ``web.xai`` in ``config.yaml``):: + + web: + xai: + model: "grok-4.3" # reasoning model required by web_search + allowed_domains: ["x.ai"] # max 5 — mutually exclusive with excluded_domains + excluded_domains: ["bad.com"] # max 5 — mutually exclusive with allowed_domains + timeout: 90 # seconds (default 90) + +Auth: reuses :func:`tools.xai_http.resolve_xai_http_credentials`, which +prefers Hermes-managed xAI Grok OAuth (via ``hermes auth``) and falls back +to ``XAI_API_KEY`` (resolved through ``~/.hermes/.env``, then +``os.environ``). +""" + +from __future__ import annotations + +import json +import logging +import re +from typing import Any, Dict, List, Optional + +from agent.web_search_provider import WebSearchProvider +from tools.xai_http import ( + has_xai_credentials, + hermes_xai_user_agent, + resolve_xai_http_credentials, +) + +logger = logging.getLogger(__name__) + +DEFAULT_MODEL = "grok-4.3" +DEFAULT_TIMEOUT = 90 +_MAX_DOMAIN_FILTERS = 5 # xAI hard cap on allowed_domains / excluded_domains + +# Match the JSON object Grok is asked to emit. Tolerates leading/trailing +# prose since reasoning models occasionally narrate before the JSON block +# even when explicitly asked not to. +_JSON_BLOCK_RE = re.compile(r"\{[\s\S]*\}", re.MULTILINE) + + +# --------------------------------------------------------------------------- +# Config +# --------------------------------------------------------------------------- + + +def _load_xai_web_config() -> Dict[str, Any]: + """Read ``web.xai`` from config.yaml (returns {} on miss).""" + try: + from hermes_cli.config import load_config + + cfg = load_config() + web_section = cfg.get("web") if isinstance(cfg, dict) else None + xai_section = web_section.get("xai") if isinstance(web_section, dict) else None + return xai_section if isinstance(xai_section, dict) else {} + except Exception as exc: # noqa: BLE001 + logger.debug("Could not load web.xai config: %s", exc) + return {} + + +def _coerce_domain_list(value: Any) -> List[str]: + """Coerce a config value to a clean list of <=5 domain strings.""" + if not isinstance(value, list): + return [] + cleaned: List[str] = [] + for item in value: + if isinstance(item, str) and item.strip(): + cleaned.append(item.strip()) + if len(cleaned) >= _MAX_DOMAIN_FILTERS: + break + return cleaned + + +# --------------------------------------------------------------------------- +# Provider +# --------------------------------------------------------------------------- + + +class XAIWebSearchProvider(WebSearchProvider): + """Search-only provider backed by xAI's agentic Web Search tool. + + Sends a structured prompt to Grok with ``tools=[{"type": "web_search"}]`` + enabled and asks it to return the top *limit* results as JSON. Falls + back to the Responses API ``citations`` list if Grok ignores the JSON + schema instruction (rare for grok-4.3 but cheap insurance). + + No extract capability — pair with Firecrawl / Tavily / Exa for + ``web_extract`` if you need page content. + + Trust model + ----------- + Unlike index-backed providers (Brave / Tavily / Exa) which return + verbatim search-engine results, this backend is an LLM in a trench + coat: Grok decides which URLs to surface, generates the titles and + descriptions itself, and is influenced by the *content of the query*. + A maliciously crafted query (e.g. injected via untrusted upstream + input the agent picked up) can in principle steer Grok into emitting + attacker-chosen URLs. Callers that pipe untrusted text directly into + ``web_search`` should treat returned URLs the same way they would + treat any model-generated link — validate before fetching. + """ + + @property + def name(self) -> str: + return "xai" + + @property + def display_name(self) -> str: + return "xAI Web Search (Grok)" + + def is_available(self) -> bool: + """Cheap availability probe — env var OR auth-store has OAuth tokens. + + Delegates to :func:`tools.xai_http.has_xai_credentials`, which is + deliberately *not* the same as :func:`resolve_xai_http_credentials`: + it never triggers OAuth token refresh or acquires the auth-store + lock. The ABC contract requires this method to be safe to call on + every ``hermes tools`` repaint and at tool-registration time. + Token freshness / refresh is handled inside :meth:`search`. + """ + return has_xai_credentials() + + def supports_search(self) -> bool: + return True + + def supports_extract(self) -> bool: + return False + + def supports_crawl(self) -> bool: + return False + + # -- Search ----------------------------------------------------------- + + def search(self, query: str, limit: int = 5) -> Dict[str, Any]: + """Execute a Grok-backed web search. + + Returns ``{"success": True, "data": {"web": [{title, url, description, position}, ...]}}`` + on success, ``{"success": False, "error": str}`` on failure. + """ + try: + from tools.interrupt import is_interrupted + + if is_interrupted(): + return {"success": False, "error": "Interrupted"} + except Exception: # noqa: BLE001 — interrupt module is best-effort + pass + + creds = resolve_xai_http_credentials() + api_key = str(creds.get("api_key") or "").strip() + base_url = str(creds.get("base_url") or "https://api.x.ai/v1").strip().rstrip("/") + if not api_key: + return { + "success": False, + "error": ( + "No xAI credentials found. Run `hermes auth` to sign in with " + "xAI Grok OAuth, or set XAI_API_KEY." + ), + } + + # Clamp limit to the same range the caller (web_search_tool) accepts, + # so we don't silently downgrade explicit limits. Grok happily + # produces longer lists; cost scales linearly with the requested + # count via reasoning tokens, but that's the caller's call to make. + try: + limit = int(limit) + except (TypeError, ValueError): + limit = 5 + limit = max(1, min(limit, 100)) + + cfg = _load_xai_web_config() + model = cfg.get("model") if isinstance(cfg.get("model"), str) else DEFAULT_MODEL + model = model.strip() or DEFAULT_MODEL + + try: + timeout = float(cfg.get("timeout", DEFAULT_TIMEOUT)) + except (TypeError, ValueError): + timeout = DEFAULT_TIMEOUT + + allowed = _coerce_domain_list(cfg.get("allowed_domains")) + excluded = _coerce_domain_list(cfg.get("excluded_domains")) + if allowed and excluded: + # xAI explicitly rejects this combo — surface a clear error + # rather than a 400 from the API. + return { + "success": False, + "error": ( + "web.xai.allowed_domains and web.xai.excluded_domains " + "cannot both be set (xAI restriction)." + ), + } + + web_search_tool: Dict[str, Any] = {"type": "web_search"} + if allowed: + web_search_tool["filters"] = {"allowed_domains": allowed} + elif excluded: + web_search_tool["filters"] = {"excluded_domains": excluded} + + prompt = self._build_prompt(query, limit) + + payload: Dict[str, Any] = { + "model": model, + "input": [{"role": "user", "content": prompt}], + "tools": [web_search_tool], + # Drop inline citation markdown — we want the JSON block clean, + # and we read URLs from annotations / citations separately. + "include": ["no_inline_citations"], + } + + headers = { + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + "User-Agent": hermes_xai_user_agent(), + } + + try: + import httpx + except ImportError: + return { + "success": False, + "error": "httpx is not installed (required for xAI web search)", + } + + logger.info( + "xAI web search via %s: '%s' (limit=%d, model=%s)", + base_url, query, limit, model, + ) + + # Two-attempt loop: if the first call returns 401 and our creds came + # from the OAuth path, force-refresh the token once and retry. This + # closes two gaps the proactive resolver check doesn't cover: + # (1) opaque (non-JWT) access tokens — `_xai_access_token_is_expiring` + # can't decode them and returns False, so refresh never fires + # until the server hands us a 401. + # (2) mid-window revocation — admin revoke, refresh-token rotation, + # or clock skew can produce 401s on a token whose JWT `exp` claim + # is still in the future. + # Env-var (`XAI_API_KEY`) credentials skip the retry entirely — we + # can't refresh those and an immediate retry would just burn quota. + is_oauth_path = (creds.get("provider") == "xai-oauth") + resp = None + for attempt in range(2): + try: + resp = httpx.post( + f"{base_url}/responses", + headers=headers, + json=payload, + timeout=timeout, + ) + resp.raise_for_status() + break + except httpx.HTTPStatusError as exc: + status = exc.response.status_code if exc.response is not None else 0 + if status == 401 and attempt == 0 and is_oauth_path: + logger.info( + "xAI web search got 401 on first attempt; forcing OAuth " + "refresh and retrying once.", + ) + try: + refreshed = resolve_xai_http_credentials(force_refresh=True) + refreshed_key = str(refreshed.get("api_key") or "").strip() + if refreshed_key and refreshed_key != api_key: + api_key = refreshed_key + headers["Authorization"] = f"Bearer {api_key}" + continue + # Refresh returned the same (or empty) token — no point + # in retrying. Fall through to the error return below. + except Exception as refresh_exc: # noqa: BLE001 + logger.warning( + "xAI web search OAuth refresh after 401 failed: %s", + refresh_exc, + ) + body = "" + try: + body = exc.response.text[:300] if exc.response is not None else "" + except Exception: + body = "" + logger.warning("xAI web search HTTP %d: %s", status, body) + return { + "success": False, + "error": f"xAI web search returned HTTP {status}: {body}".rstrip(), + } + except httpx.RequestError as exc: + logger.warning("xAI web search request error: %s", exc) + return {"success": False, "error": f"Could not reach xAI: {exc}"} + + if resp is None: + # Defensive — both attempts somehow exited the loop without resp. + return {"success": False, "error": "xAI web search produced no response"} + + try: + data = resp.json() + except Exception as exc: # noqa: BLE001 + logger.warning("xAI web search bad JSON: %s", exc) + return { + "success": False, + "error": "Could not parse xAI Responses API reply as JSON", + } + + # xAI's Responses surface sometimes returns HTTP 200 with an error + # envelope (model overloaded, content-policy refusal, etc.). Without + # this check, ``_extract_results`` would silently produce an empty + # list and we'd report success-with-no-rows — masking a real failure + # the agent should see and decide whether to retry. + api_error = data.get("error") if isinstance(data, dict) else None + if isinstance(api_error, dict): + err_msg = ( + api_error.get("message") + or api_error.get("code") + or "unknown error" + ) + logger.warning("xAI web search returned error envelope: %s", err_msg) + return {"success": False, "error": f"xAI returned an error: {err_msg}"} + + web_results = self._extract_results(data, limit=limit) + if not web_results: + # Successful call, just no usable rows — return success with an + # empty list so the model can decide whether to retry. Matches + # what brave-free / exa do when the upstream API returns 0 hits. + return {"success": True, "data": {"web": []}} + + return {"success": True, "data": {"web": web_results}} + + # -- Prompt + parsing ------------------------------------------------- + + @staticmethod + def _build_prompt(query: str, limit: int) -> str: + """Compose the prompt that asks Grok to act as a search engine. + + We deliberately ask for a JSON object (not bare array) so we can + match it cheaply with ``_JSON_BLOCK_RE``; we explicitly forbid + prose, markdown fences, and inline-citation links to keep the + payload parseable. + """ + return ( + "Use the web_search tool to find current information for the query below, " + "then respond with ONLY a single JSON object — no prose, no markdown " + "fences, no inline citation links — matching this exact schema:\n\n" + '{"results": [{"title": "string", "url": "string", ' + '"description": "1-2 sentence summary"}]}\n\n' + f'Return at most {limit} results, ordered by relevance, with absolute ' + "https:// URLs. If no usable results exist, return " + '{"results": []}.\n\n' + f"Query: {query}" + ) + + @classmethod + def _extract_results( + cls, + response_data: Dict[str, Any], + *, + limit: int, + ) -> List[Dict[str, Any]]: + """Pull a ``[{title, url, description, position}, ...]`` list out of a + Responses-API reply. + + Strategy: + + 1. Walk ``output[*].content[*].text`` for ``output_text`` blocks and + try to parse the first JSON object that has a ``results`` list. + 2. If the JSON path fails, fall back to the message annotations + (``url_citation`` entries) — every annotation carries a URL and + a ``title`` (citation number); we pair those URLs with surrounding + text from the message body as a best-effort description. + """ + text_blocks, annotations = cls._collect_output_text(response_data) + + # Primary path: parse the JSON object Grok was asked for. + for block in text_blocks: + parsed = cls._try_parse_json_results(block, limit=limit) + if parsed: + return parsed + + # Secondary path: derive results from message annotations + raw text. + # Only short-circuit when annotations actually yielded usable rows; + # otherwise fall through to the citations list. (xAI currently only + # emits ``url_citation`` annotations, but future annotation types + # would silently produce an empty result set if we returned here + # unconditionally — masking real data in ``citations``.) + if annotations: + joined_text = "\n".join(text_blocks) + annotation_results = cls._results_from_annotations( + annotations, joined_text, limit=limit, + ) + if annotation_results: + return annotation_results + + # Last-ditch: raw citations list (no titles or descriptions). + citations = response_data.get("citations") or [] + if isinstance(citations, list): + return [ + { + "title": "", + "url": str(u), + "description": "", + "position": i + 1, + } + for i, u in enumerate(citations[:limit]) + if isinstance(u, str) and u.strip() + ] + + return [] + + @staticmethod + def _collect_output_text( + response_data: Dict[str, Any], + ) -> tuple[List[str], List[Dict[str, Any]]]: + """Return (text_blocks, annotations) extracted from ``response.output``.""" + text_blocks: List[str] = [] + annotations: List[Dict[str, Any]] = [] + output = response_data.get("output") + if not isinstance(output, list): + return text_blocks, annotations + + for item in output: + if not isinstance(item, dict) or item.get("type") != "message": + continue + content = item.get("content") + if not isinstance(content, list): + continue + for chunk in content: + if not isinstance(chunk, dict) or chunk.get("type") != "output_text": + continue + text = chunk.get("text") + if isinstance(text, str) and text.strip(): + text_blocks.append(text) + chunk_annotations = chunk.get("annotations") + if isinstance(chunk_annotations, list): + for ann in chunk_annotations: + if isinstance(ann, dict): + annotations.append(ann) + return text_blocks, annotations + + @staticmethod + def _try_parse_json_results( + text: str, + *, + limit: int, + ) -> Optional[List[Dict[str, Any]]]: + """Parse a JSON object with a ``results`` array out of ``text``. + + Returns the normalized result list on success, ``None`` when the + block has no valid JSON object or no ``results`` key. Tolerates + leading/trailing prose because reasoning models sometimes prefix a + short narration even when told not to. + """ + # Try the whole string first — cheapest path when Grok obeys. + candidates = [text] + match = _JSON_BLOCK_RE.search(text) + if match and match.group(0) != text: + candidates.append(match.group(0)) + + for candidate in candidates: + try: + parsed = json.loads(candidate) + except (json.JSONDecodeError, ValueError): + continue + if not isinstance(parsed, dict): + continue + results = parsed.get("results") + if not isinstance(results, list): + continue + normalized: List[Dict[str, Any]] = [] + for row in results[:limit]: + if not isinstance(row, dict): + continue + url = str(row.get("url", "")).strip() + if not url: + continue + normalized.append( + { + "title": str(row.get("title", "")).strip(), + "url": url, + "description": str(row.get("description", "")).strip(), + # Renumber from the kept results, not the raw input + # index, so a dropped malformed row doesn't leave a + # gap in the positions handed back to the agent. + "position": len(normalized) + 1, + } + ) + if normalized: + return normalized + return None + + @staticmethod + def _results_from_annotations( + annotations: List[Dict[str, Any]], + joined_text: str, + *, + limit: int, + ) -> List[Dict[str, Any]]: + """Best-effort fallback when JSON parsing fails. + + Uses each ``url_citation`` annotation's ``url`` (the citation + title is just the integer label, so we don't surface it) and + slices ~200 characters of surrounding text as the description. + """ + seen: set[str] = set() + results: List[Dict[str, Any]] = [] + for ann in annotations: + if ann.get("type") != "url_citation": + continue + url = str(ann.get("url", "")).strip() + if not url or url in seen: + continue + seen.add(url) + + description = "" + start = ann.get("start_index") + end = ann.get("end_index") + if isinstance(start, int) and isinstance(end, int) and 0 <= start < end <= len(joined_text): + window_start = max(0, start - 200) + description = joined_text[window_start:start].strip() + if len(description) > 200: + description = description[-200:].strip() + + results.append( + { + "title": "", + "url": url, + "description": description, + "position": len(results) + 1, + } + ) + if len(results) >= limit: + break + return results + + # -- Setup picker ----------------------------------------------------- + + def get_setup_schema(self) -> Dict[str, Any]: + # Auth resolution is delegated to the shared ``xai_grok`` post_setup + # hook (same one image_gen.xai and tts.xai use) so users see the + # familiar OAuth-or-API-key prompt for every xAI service. + return { + "name": "xAI Web Search (Grok)", + "badge": "paid", + "tag": ( + "Agentic web search via Grok's web_search tool — uses xAI " + "Grok OAuth or XAI_API_KEY." + ), + "env_vars": [], + "post_setup": "xai_grok", + } diff --git a/tests/tools/test_web_providers_xai.py b/tests/tools/test_web_providers_xai.py new file mode 100644 index 0000000000..d5a3deaf68 --- /dev/null +++ b/tests/tools/test_web_providers_xai.py @@ -0,0 +1,767 @@ +"""Tests for the xAI Web Search provider (plugins/web/xai/). + +Covers: +- XAIWebSearchProvider.is_available() — cheap probe (env var + auth.json) +- search() — JSON happy path, annotation fallback, citations fallback, empty results +- search() error paths — HTTP error, request error, missing creds, mutually-exclusive domain filters, + 200-OK error envelope +- Request payload shape — model, tools list, allowed_domains/excluded_domains filters +- OAuth credential resolution end-to-end through tools.xai_http +- _is_backend_available("xai") integration with tools.web_tools +- _get_backend() accepts "xai" as a configured backend +""" +from __future__ import annotations + +import json +from unittest.mock import MagicMock, patch + + +def _creds(api_key: str = "xai-test-key", base_url: str = "https://api.x.ai/v1") -> dict: + return {"provider": "xai", "api_key": api_key, "base_url": base_url} + + +def _mock_resp(json_data, status_code: int = 200): + m = MagicMock() + m.status_code = status_code + m.json.return_value = json_data + m.raise_for_status = MagicMock() + return m + + +def _responses_payload(text: str, annotations=None, citations=None) -> dict: + """Build a minimal Responses-API reply with one message + output_text block.""" + chunk: dict = {"type": "output_text", "text": text} + if annotations is not None: + chunk["annotations"] = annotations + payload: dict = { + "output": [ + { + "type": "message", + "content": [chunk], + } + ], + } + if citations is not None: + payload["citations"] = citations + return payload + + +# --------------------------------------------------------------------------- +# Provider identity / availability +# --------------------------------------------------------------------------- + + +class TestXAIProviderIdentity: + def test_provider_name(self): + from plugins.web.xai.provider import XAIWebSearchProvider + assert XAIWebSearchProvider().name == "xai" + + def test_implements_web_search_provider(self): + from agent.web_search_provider import WebSearchProvider + from plugins.web.xai.provider import XAIWebSearchProvider + assert issubclass(XAIWebSearchProvider, WebSearchProvider) + + def test_supports_search_only(self): + from plugins.web.xai.provider import XAIWebSearchProvider + p = XAIWebSearchProvider() + assert p.supports_search() is True + assert p.supports_extract() is False + assert p.supports_crawl() is False + + def test_display_name(self): + from plugins.web.xai.provider import XAIWebSearchProvider + assert "Grok" in XAIWebSearchProvider().display_name + + +class TestXAIProviderIsAvailable: + """``is_available()`` MUST be cheap — no network, no token refresh, no + auth-store lock. It runs on every ``hermes tools`` repaint and at + tool-registration time, so any I/O regression here would surface as + visible CLI latency. + """ + + def test_available_via_env_var(self, monkeypatch): + monkeypatch.setenv("XAI_API_KEY", "sk-xai-test") + from plugins.web.xai.provider import XAIWebSearchProvider + assert XAIWebSearchProvider().is_available() is True + + def test_available_via_auth_store(self, monkeypatch, tmp_path): + """Cheap probe should detect xai-oauth tokens in ~/.hermes/auth.json + without invoking the resolver (which can trigger refresh).""" + monkeypatch.delenv("XAI_API_KEY", raising=False) + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + auth_path = tmp_path / "auth.json" + auth_path.write_text(json.dumps({ + "version": 1, + "providers": { + "xai-oauth": {"tokens": {"access_token": "ya29.fake-access-token"}}, + }, + })) + + from plugins.web.xai.provider import XAIWebSearchProvider + assert XAIWebSearchProvider().is_available() is True + + def test_unavailable_when_no_env_and_no_auth_store(self, monkeypatch, tmp_path): + monkeypatch.delenv("XAI_API_KEY", raising=False) + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + # No auth.json written. + from plugins.web.xai.provider import XAIWebSearchProvider + assert XAIWebSearchProvider().is_available() is False + + def test_unavailable_when_auth_store_has_empty_token(self, monkeypatch, tmp_path): + monkeypatch.delenv("XAI_API_KEY", raising=False) + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + auth_path = tmp_path / "auth.json" + auth_path.write_text(json.dumps({ + "version": 1, + "providers": {"xai-oauth": {"tokens": {"access_token": ""}}}, + })) + + from plugins.web.xai.provider import XAIWebSearchProvider + assert XAIWebSearchProvider().is_available() is False + + def test_unavailable_when_auth_store_corrupted(self, monkeypatch, tmp_path): + """A malformed auth.json must not crash availability scans.""" + monkeypatch.delenv("XAI_API_KEY", raising=False) + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + (tmp_path / "auth.json").write_text("not json at all }{") + + from plugins.web.xai.provider import XAIWebSearchProvider + assert XAIWebSearchProvider().is_available() is False + + def test_is_available_does_not_call_resolver(self, monkeypatch): + """Regression guard: ``is_available()`` must NEVER touch the resolver, + because the OAuth resolver can trigger a network refresh.""" + monkeypatch.setenv("XAI_API_KEY", "sk-xai-test") + from plugins.web.xai import provider as xai_provider + + with patch.object( + xai_provider, "resolve_xai_http_credentials", + side_effect=AssertionError("is_available must not call the resolver"), + ): + assert xai_provider.XAIWebSearchProvider().is_available() is True + + +# --------------------------------------------------------------------------- +# search() happy + parse paths +# --------------------------------------------------------------------------- + + +class TestXAIProviderSearchJSONPath: + _GROK_JSON = json.dumps({ + "results": [ + {"title": "xAI", "url": "https://x.ai", "description": "The company."}, + {"title": "Grok docs", "url": "https://docs.x.ai", "description": "API reference."}, + {"title": "Grokipedia", "url": "https://grokipedia.com", "description": "Wiki."}, + ] + }) + + def test_happy_path_normalizes_results(self): + from plugins.web.xai import provider as xai_provider + + with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \ + patch.object(xai_provider, "_load_xai_web_config", return_value={}), \ + patch("httpx.post", return_value=_mock_resp(_responses_payload(self._GROK_JSON))): + result = xai_provider.XAIWebSearchProvider().search("what is xai", limit=5) + + assert result["success"] is True + web = result["data"]["web"] + assert len(web) == 3 + assert web[0] == { + "title": "xAI", + "url": "https://x.ai", + "description": "The company.", + "position": 1, + } + assert web[2]["position"] == 3 + + def test_limit_truncates_json_results(self): + from plugins.web.xai import provider as xai_provider + + with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \ + patch.object(xai_provider, "_load_xai_web_config", return_value={}), \ + patch("httpx.post", return_value=_mock_resp(_responses_payload(self._GROK_JSON))): + result = xai_provider.XAIWebSearchProvider().search("x", limit=2) + + assert result["success"] is True + assert len(result["data"]["web"]) == 2 + + def test_parses_json_with_leading_prose(self): + """Reasoning models sometimes narrate before the JSON block; we tolerate it.""" + from plugins.web.xai import provider as xai_provider + + text = "Here are the results:\n" + self._GROK_JSON + with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \ + patch.object(xai_provider, "_load_xai_web_config", return_value={}), \ + patch("httpx.post", return_value=_mock_resp(_responses_payload(text))): + result = xai_provider.XAIWebSearchProvider().search("q", limit=5) + + assert result["success"] is True + assert len(result["data"]["web"]) == 3 + + def test_drops_rows_without_url(self): + from plugins.web.xai import provider as xai_provider + + bad_json = json.dumps({ + "results": [ + {"title": "no url", "description": "skip me"}, + {"title": "good", "url": "https://ok.com", "description": "keep"}, + ] + }) + with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \ + patch.object(xai_provider, "_load_xai_web_config", return_value={}), \ + patch("httpx.post", return_value=_mock_resp(_responses_payload(bad_json))): + result = xai_provider.XAIWebSearchProvider().search("q", limit=5) + + assert result["success"] is True + web = result["data"]["web"] + assert len(web) == 1 + assert web[0]["url"] == "https://ok.com" + assert web[0]["position"] == 1 + + +class TestXAIProviderSearchFallbacks: + def test_falls_back_to_annotations_when_json_missing(self): + """If Grok ignores the JSON instruction, derive results from url_citation annotations.""" + from plugins.web.xai import provider as xai_provider + + body = "xAI is an AI company founded in 2023. They make Grok." + annotations = [ + { + "type": "url_citation", + "url": "https://x.ai/about", + "title": "1", + "start_index": 4, + "end_index": 9, + }, + { + "type": "url_citation", + "url": "https://docs.x.ai", + "title": "2", + "start_index": 47, + "end_index": 52, + }, + ] + with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \ + patch.object(xai_provider, "_load_xai_web_config", return_value={}), \ + patch("httpx.post", return_value=_mock_resp(_responses_payload(body, annotations=annotations))): + result = xai_provider.XAIWebSearchProvider().search("xai", limit=5) + + assert result["success"] is True + urls = [r["url"] for r in result["data"]["web"]] + assert urls == ["https://x.ai/about", "https://docs.x.ai"] + assert result["data"]["web"][0]["position"] == 1 + assert result["data"]["web"][1]["position"] == 2 + + def test_falls_back_to_citations_list(self): + """If no JSON and no annotations, derive from top-level citations list.""" + from plugins.web.xai import provider as xai_provider + + payload = _responses_payload("free-form narration", citations=["https://a.com", "https://b.com"]) + with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \ + patch.object(xai_provider, "_load_xai_web_config", return_value={}), \ + patch("httpx.post", return_value=_mock_resp(payload)): + result = xai_provider.XAIWebSearchProvider().search("q", limit=5) + + assert result["success"] is True + urls = [r["url"] for r in result["data"]["web"]] + assert urls == ["https://a.com", "https://b.com"] + + def test_annotations_without_url_citations_fall_through_to_citations(self): + """When annotations exist but none are url_citation type (e.g. future + annotation types xAI may add), the citations list MUST still be + consulted — otherwise we'd silently report success-with-no-rows + and mask real data the API provided. + """ + from plugins.web.xai import provider as xai_provider + + body = "Some narration about xAI." + # Non-url_citation annotations only — the fallback shouldn't extract + # any URLs from them, and must defer to the citations list below. + annotations = [ + {"type": "future_citation_type", "url": "https://ignored.example", "title": "x"}, + ] + payload = _responses_payload( + body, + annotations=annotations, + citations=["https://real-fallback.com"], + ) + with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \ + patch.object(xai_provider, "_load_xai_web_config", return_value={}), \ + patch("httpx.post", return_value=_mock_resp(payload)): + result = xai_provider.XAIWebSearchProvider().search("q", limit=5) + + assert result["success"] is True + urls = [r["url"] for r in result["data"]["web"]] + assert urls == ["https://real-fallback.com"] + + def test_empty_response_returns_empty_success(self): + from plugins.web.xai import provider as xai_provider + + payload = _responses_payload("", citations=[]) + with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \ + patch.object(xai_provider, "_load_xai_web_config", return_value={}), \ + patch("httpx.post", return_value=_mock_resp(payload)): + result = xai_provider.XAIWebSearchProvider().search("q", limit=5) + + assert result["success"] is True + assert result["data"]["web"] == [] + + +# --------------------------------------------------------------------------- +# Request payload shape +# --------------------------------------------------------------------------- + + +class TestXAIProviderRequestShape: + def test_posts_to_responses_endpoint_with_bearer_token(self): + from plugins.web.xai import provider as xai_provider + + captured: dict = {} + + def fake_post(url, **kwargs): + captured["url"] = url + captured["headers"] = kwargs.get("headers", {}) + captured["json"] = kwargs.get("json", {}) + return _mock_resp(_responses_payload(json.dumps({"results": []}))) + + with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds("secret-key")), \ + patch.object(xai_provider, "_load_xai_web_config", return_value={}), \ + patch("httpx.post", side_effect=fake_post): + xai_provider.XAIWebSearchProvider().search("q", limit=5) + + assert captured["url"] == "https://api.x.ai/v1/responses" + assert captured["headers"].get("Authorization") == "Bearer secret-key" + body = captured["json"] + # Assert against the module constant rather than the literal value, + # so renaming DEFAULT_MODEL (when xAI deprecates grok-4.3) doesn't + # turn this into a change-detector failure. + assert body["model"] == xai_provider.DEFAULT_MODEL + assert body["tools"] == [{"type": "web_search"}] + assert body["input"][0]["role"] == "user" + # No-inline-citations is opt-in via `include` per xAI Responses docs. + assert "no_inline_citations" in body.get("include", []) + + def test_honors_configured_model(self): + from plugins.web.xai import provider as xai_provider + + captured: dict = {} + + def fake_post(url, **kwargs): + captured["json"] = kwargs.get("json", {}) + return _mock_resp(_responses_payload(json.dumps({"results": []}))) + + with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \ + patch.object(xai_provider, "_load_xai_web_config", return_value={"model": "grok-4.3-fast"}), \ + patch("httpx.post", side_effect=fake_post): + xai_provider.XAIWebSearchProvider().search("q", limit=5) + + assert captured["json"]["model"] == "grok-4.3-fast" + + def test_allowed_domains_passes_through_as_filters(self): + from plugins.web.xai import provider as xai_provider + + captured: dict = {} + + def fake_post(url, **kwargs): + captured["json"] = kwargs.get("json", {}) + return _mock_resp(_responses_payload(json.dumps({"results": []}))) + + cfg = {"allowed_domains": ["x.ai", "grokipedia.com"]} + with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \ + patch.object(xai_provider, "_load_xai_web_config", return_value=cfg), \ + patch("httpx.post", side_effect=fake_post): + xai_provider.XAIWebSearchProvider().search("q", limit=5) + + tools = captured["json"]["tools"] + assert tools == [{ + "type": "web_search", + "filters": {"allowed_domains": ["x.ai", "grokipedia.com"]}, + }] + + def test_excluded_domains_passes_through_as_filters(self): + from plugins.web.xai import provider as xai_provider + + captured: dict = {} + + def fake_post(url, **kwargs): + captured["json"] = kwargs.get("json", {}) + return _mock_resp(_responses_payload(json.dumps({"results": []}))) + + cfg = {"excluded_domains": ["spam.com"]} + with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \ + patch.object(xai_provider, "_load_xai_web_config", return_value=cfg), \ + patch("httpx.post", side_effect=fake_post): + xai_provider.XAIWebSearchProvider().search("q", limit=5) + + tools = captured["json"]["tools"] + assert tools == [{ + "type": "web_search", + "filters": {"excluded_domains": ["spam.com"]}, + }] + + def test_allowed_domains_capped_at_five(self): + """xAI caps domain filters at 5; we trim silently to avoid 400s.""" + from plugins.web.xai import provider as xai_provider + + captured: dict = {} + + def fake_post(url, **kwargs): + captured["json"] = kwargs.get("json", {}) + return _mock_resp(_responses_payload(json.dumps({"results": []}))) + + cfg = {"allowed_domains": [f"d{i}.com" for i in range(10)]} + with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \ + patch.object(xai_provider, "_load_xai_web_config", return_value=cfg), \ + patch("httpx.post", side_effect=fake_post): + xai_provider.XAIWebSearchProvider().search("q", limit=5) + + domains = captured["json"]["tools"][0]["filters"]["allowed_domains"] + assert len(domains) == 5 + + +# --------------------------------------------------------------------------- +# Error paths +# --------------------------------------------------------------------------- + + +class TestXAIProviderSearchErrors: + def test_missing_creds_returns_failure(self): + from plugins.web.xai import provider as xai_provider + + with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds("")): + result = xai_provider.XAIWebSearchProvider().search("q", limit=5) + + assert result["success"] is False + assert "xAI" in result["error"] + + def test_mutually_exclusive_domain_filters_rejected_locally(self): + from plugins.web.xai import provider as xai_provider + + cfg = {"allowed_domains": ["a.com"], "excluded_domains": ["b.com"]} + with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \ + patch.object(xai_provider, "_load_xai_web_config", return_value=cfg), \ + patch("httpx.post") as posted: + result = xai_provider.XAIWebSearchProvider().search("q", limit=5) + + assert result["success"] is False + assert "cannot both be set" in result["error"] + posted.assert_not_called() + + def test_http_error_returns_failure(self): + import httpx + from plugins.web.xai import provider as xai_provider + + bad = MagicMock() + bad.status_code = 429 + bad.text = "rate limited" + err = httpx.HTTPStatusError("429", request=MagicMock(), response=bad) + + with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \ + patch.object(xai_provider, "_load_xai_web_config", return_value={}), \ + patch("httpx.post", side_effect=err): + result = xai_provider.XAIWebSearchProvider().search("q", limit=5) + + assert result["success"] is False + assert "429" in result["error"] + + def test_request_error_returns_failure(self): + import httpx + from plugins.web.xai import provider as xai_provider + + with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \ + patch.object(xai_provider, "_load_xai_web_config", return_value={}), \ + patch("httpx.post", side_effect=httpx.RequestError("boom")): + result = xai_provider.XAIWebSearchProvider().search("q", limit=5) + + assert result["success"] is False + assert "boom" in result["error"] or "xAI" in result["error"] + + def test_bad_json_response_returns_failure(self): + from plugins.web.xai import provider as xai_provider + + bad = MagicMock() + bad.status_code = 200 + bad.raise_for_status = MagicMock() + bad.json.side_effect = ValueError("not json") + + with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \ + patch.object(xai_provider, "_load_xai_web_config", return_value={}), \ + patch("httpx.post", return_value=bad): + result = xai_provider.XAIWebSearchProvider().search("q", limit=5) + + assert result["success"] is False + assert "JSON" in result["error"] + + def test_401_on_oauth_path_triggers_force_refresh_and_retry(self): + """OAuth credentials → 401 must force-refresh and retry once. + + Closes the two-gap scenario the resolver's JWT-exp shortcut doesn't + cover: opaque (non-JWT) tokens and mid-window revocation. We expect + ``httpx.post`` to be called twice with two different Bearer tokens. + """ + import httpx + from plugins.web.xai import provider as xai_provider + + bad = MagicMock() + bad.status_code = 401 + bad.text = "Unauthorized" + unauthorized = httpx.HTTPStatusError("401", request=MagicMock(), response=bad) + + calls = {"posts": [], "refresh_count": 0} + + def fake_post(url, **kwargs): + calls["posts"].append(kwargs.get("headers", {}).get("Authorization")) + if len(calls["posts"]) == 1: + raise unauthorized + return _mock_resp(_responses_payload(json.dumps({"results": []}))) + + def fake_resolve(*, force_refresh=False): + if force_refresh: + calls["refresh_count"] += 1 + return { + "provider": "xai-oauth", + "api_key": "fresh-after-refresh", + "base_url": "https://api.x.ai/v1", + } + return { + "provider": "xai-oauth", + "api_key": "stale-token", + "base_url": "https://api.x.ai/v1", + } + + with patch.object(xai_provider, "resolve_xai_http_credentials", side_effect=fake_resolve), \ + patch.object(xai_provider, "_load_xai_web_config", return_value={}), \ + patch("httpx.post", side_effect=fake_post): + result = xai_provider.XAIWebSearchProvider().search("q", limit=5) + + assert result["success"] is True + assert calls["refresh_count"] == 1 + assert calls["posts"] == ["Bearer stale-token", "Bearer fresh-after-refresh"] + + def test_401_on_env_var_path_does_not_retry(self): + """Env-var (XAI_API_KEY) creds can't be refreshed — must not retry.""" + import httpx + from plugins.web.xai import provider as xai_provider + + bad = MagicMock() + bad.status_code = 401 + bad.text = "Unauthorized" + unauthorized = httpx.HTTPStatusError("401", request=MagicMock(), response=bad) + + calls = {"posts": 0, "refreshed": False} + + def fake_post(url, **kwargs): + calls["posts"] += 1 + raise unauthorized + + def fake_resolve(*, force_refresh=False): + if force_refresh: + calls["refreshed"] = True + # provider=="xai" signals env-var path; retry must be skipped. + return {"provider": "xai", "api_key": "sk-env-var-key", "base_url": "https://api.x.ai/v1"} + + with patch.object(xai_provider, "resolve_xai_http_credentials", side_effect=fake_resolve), \ + patch.object(xai_provider, "_load_xai_web_config", return_value={}), \ + patch("httpx.post", side_effect=fake_post): + result = xai_provider.XAIWebSearchProvider().search("q", limit=5) + + assert result["success"] is False + assert "401" in result["error"] + assert calls["posts"] == 1 + assert calls["refreshed"] is False + + def test_401_retry_gives_up_when_refresh_returns_same_token(self): + """If the force-refresh returns the same token (refresh-token also + dead), don't loop — surface the 401 to the caller.""" + import httpx + from plugins.web.xai import provider as xai_provider + + bad = MagicMock() + bad.status_code = 401 + bad.text = "Unauthorized" + unauthorized = httpx.HTTPStatusError("401", request=MagicMock(), response=bad) + + calls = {"posts": 0, "refresh_count": 0} + + def fake_post(url, **kwargs): + calls["posts"] += 1 + raise unauthorized + + def fake_resolve(*, force_refresh=False): + if force_refresh: + calls["refresh_count"] += 1 + return { + "provider": "xai-oauth", + "api_key": "same-dead-token", + "base_url": "https://api.x.ai/v1", + } + + with patch.object(xai_provider, "resolve_xai_http_credentials", side_effect=fake_resolve), \ + patch.object(xai_provider, "_load_xai_web_config", return_value={}), \ + patch("httpx.post", side_effect=fake_post): + result = xai_provider.XAIWebSearchProvider().search("q", limit=5) + + assert result["success"] is False + assert "401" in result["error"] + # One post, one force-refresh attempt, no second post. + assert calls["posts"] == 1 + assert calls["refresh_count"] == 1 + + def test_non_401_http_error_is_not_retried(self): + """Only 401 is retryable — 429 / 500 / 503 must fail fast so the + agent (or upstream rate-limiter) decides what to do.""" + import httpx + from plugins.web.xai import provider as xai_provider + + bad = MagicMock() + bad.status_code = 500 + bad.text = "internal error" + err = httpx.HTTPStatusError("500", request=MagicMock(), response=bad) + + calls = {"posts": 0, "refreshed": False} + + def fake_post(url, **kwargs): + calls["posts"] += 1 + raise err + + def fake_resolve(*, force_refresh=False): + if force_refresh: + calls["refreshed"] = True + return {"provider": "xai-oauth", "api_key": "tok", "base_url": "https://api.x.ai/v1"} + + with patch.object(xai_provider, "resolve_xai_http_credentials", side_effect=fake_resolve), \ + patch.object(xai_provider, "_load_xai_web_config", return_value={}), \ + patch("httpx.post", side_effect=fake_post): + result = xai_provider.XAIWebSearchProvider().search("q", limit=5) + + assert result["success"] is False + assert "500" in result["error"] + assert calls["posts"] == 1 + assert calls["refreshed"] is False + + def test_http_200_with_error_envelope_surfaces_failure(self): + """xAI sometimes returns 200 with ``{"error": {...}}`` (model + overloaded, refusal, etc.). Must be surfaced as a failure rather + than silently masked as success-with-empty-results. + """ + from plugins.web.xai import provider as xai_provider + + payload = {"error": {"message": "model overloaded", "type": "server_error"}} + with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \ + patch.object(xai_provider, "_load_xai_web_config", return_value={}), \ + patch("httpx.post", return_value=_mock_resp(payload)): + result = xai_provider.XAIWebSearchProvider().search("q", limit=5) + + assert result["success"] is False + assert "model overloaded" in result["error"] + + +# --------------------------------------------------------------------------- +# Integration with tools/web_tools.py backend wiring +# --------------------------------------------------------------------------- + + +class TestXAIBackendWiring: + def test_is_backend_available_true_via_env_var(self, monkeypatch): + from tools import web_tools + + monkeypatch.setenv("XAI_API_KEY", "sk-xai-test") + assert web_tools._is_backend_available("xai") is True + + def test_is_backend_available_false_when_no_creds(self, monkeypatch, tmp_path): + from tools import web_tools + + monkeypatch.delenv("XAI_API_KEY", raising=False) + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + assert web_tools._is_backend_available("xai") is False + + def test_is_backend_available_does_not_call_resolver(self, monkeypatch): + """Regression guard — `_is_backend_available` runs on every web_search + dispatch and every `hermes tools` repaint. It must not invoke the + OAuth resolver (which can trigger a network refresh).""" + from tools import web_tools + + monkeypatch.setenv("XAI_API_KEY", "sk-xai-test") + with patch( + "tools.xai_http.resolve_xai_http_credentials", + side_effect=AssertionError("must not call resolver"), + ): + assert web_tools._is_backend_available("xai") is True + + def test_configured_backend_xai_accepted(self, monkeypatch): + from tools import web_tools + monkeypatch.setattr(web_tools, "_load_web_config", lambda: {"backend": "xai"}) + assert web_tools._get_backend() == "xai" + + def test_xai_not_in_legacy_backend_candidate_chain(self, monkeypatch): + """The hardcoded ``backend_candidates`` tuple in ``_get_backend()`` + does not include xAI — by design, since the no-config legacy + chain is for users who set env vars but never ran ``hermes tools``, + and we don't want a stray ``XAI_API_KEY`` (perhaps set for chat + inference) to silently re-route web_search through Grok. + + Note: this does NOT prevent the registry's single-provider + shortcut (``agent.web_search_registry._resolve``) from selecting + xAI when it's the only available web provider. That path is the + normal "pick the one provider the user actually configured" + behavior shared by every other backend. + """ + from tools import web_tools + + monkeypatch.setattr(web_tools, "_load_web_config", lambda: {}) + for key in ( + "FIRECRAWL_API_KEY", "FIRECRAWL_API_URL", "PARALLEL_API_KEY", + "TAVILY_API_KEY", "EXA_API_KEY", "SEARXNG_URL", "BRAVE_SEARCH_API_KEY", + ): + monkeypatch.delenv(key, raising=False) + monkeypatch.setenv("XAI_API_KEY", "xai-test-key") + monkeypatch.setattr(web_tools, "_is_tool_gateway_ready", lambda: False) + monkeypatch.setattr(web_tools, "_ddgs_package_importable", lambda: False) + assert web_tools._get_backend() != "xai" + + +# --------------------------------------------------------------------------- +# OAuth credential resolution (end-to-end through tools.xai_http) +# --------------------------------------------------------------------------- + + +class TestXAIProviderOAuthPath: + """Verifies the provider works when credentials come from the OAuth + runtime resolver (``hermes auth`` sign-in) rather than an env-var key. + Patches at the ``hermes_cli.runtime_provider.resolve_runtime_provider`` + boundary so the full ``tools.xai_http.resolve_xai_http_credentials`` + chain is exercised end-to-end. + """ + + def test_search_uses_oauth_bearer_token_and_base_url(self, monkeypatch): + from plugins.web.xai import provider as xai_provider + + # Force the env-var fallback to fail so resolution must go via OAuth. + monkeypatch.delenv("XAI_API_KEY", raising=False) + + oauth_runtime = { + "provider": "xai-oauth", + "api_mode": "codex_responses", + "base_url": "https://api.x.ai/v1", + "api_key": "ya29.fake-oauth-access-token", + "source": "hermes-auth-store", + } + + captured: dict = {} + + def fake_post(url, **kwargs): + captured["url"] = url + captured["headers"] = kwargs.get("headers", {}) + return _mock_resp(_responses_payload(json.dumps({"results": []}))) + + with patch( + "hermes_cli.runtime_provider.resolve_runtime_provider", + return_value=oauth_runtime, + ), patch.object(xai_provider, "_load_xai_web_config", return_value={}), \ + patch("httpx.post", side_effect=fake_post): + result = xai_provider.XAIWebSearchProvider().search("q", limit=3) + + assert result["success"] is True + assert captured["url"] == "https://api.x.ai/v1/responses" + assert captured["headers"].get("Authorization") == "Bearer ya29.fake-oauth-access-token" diff --git a/tools/web_tools.py b/tools/web_tools.py index 597edb0c8f..a55fe78c41 100644 --- a/tools/web_tools.py +++ b/tools/web_tools.py @@ -140,7 +140,7 @@ def _get_backend() -> str: keys manually without running setup. """ configured = (_load_web_config().get("backend") or "").lower().strip() - if configured in {"parallel", "firecrawl", "tavily", "exa", "searxng", "brave-free", "ddgs"}: + if configured in {"parallel", "firecrawl", "tavily", "exa", "searxng", "brave-free", "ddgs", "xai"}: return configured # Fallback for manual / legacy config — pick the highest-priority @@ -218,6 +218,16 @@ def _is_backend_available(backend: str) -> bool: return _has_env("BRAVE_SEARCH_API_KEY") if backend == "ddgs": return _ddgs_package_importable() + if backend == "xai": + # Cheap probe — env var OR auth.json has OAuth tokens. Must not + # call resolve_xai_http_credentials() here because the OAuth path + # can trigger a network token refresh, and _is_backend_available + # runs on every web_search dispatch + every `hermes tools` repaint. + try: + from tools.xai_http import has_xai_credentials + return has_xai_credentials() + except Exception: + return False return False diff --git a/tools/xai_http.py b/tools/xai_http.py index 848ad8fc74..8e94b64aa4 100644 --- a/tools/xai_http.py +++ b/tools/xai_http.py @@ -2,9 +2,49 @@ from __future__ import annotations +import json import os from typing import Dict + +def has_xai_credentials() -> bool: + """Cheap probe — return True when xAI credentials are *likely* usable. + + Deliberately avoids :func:`resolve_xai_http_credentials` so callers in + hot-paint paths (``hermes tools`` repaint, tool-registration scans, + ``WebSearchProvider.is_available()``) don't incur disk locks or — in + the OAuth path — a network token refresh. The ABC contract on + :meth:`agent.web_search_provider.WebSearchProvider.is_available` + explicitly forbids network calls for exactly this reason. + + Resolution order, fast-to-slow: + + 1. ``XAI_API_KEY`` env var (cheapest; covers explicit-key users). + 2. ``~/.hermes/auth.json`` has a non-empty ``providers.xai-oauth.tokens.access_token`` + (single file read, no expiry check, no refresh). + + Returns False on any exception so a corrupted auth store can't block + other availability scans. Truthful refresh + expiry handling happens + in ``search()`` (or whichever caller actually makes the request). + """ + if os.environ.get("XAI_API_KEY", "").strip(): + return True + try: + from hermes_constants import get_hermes_home + + auth_path = get_hermes_home() / "auth.json" + if not auth_path.exists(): + return False + store = json.loads(auth_path.read_text()) + providers = store.get("providers") if isinstance(store, dict) else None + xai_state = providers.get("xai-oauth") if isinstance(providers, dict) else None + tokens = xai_state.get("tokens") if isinstance(xai_state, dict) else None + access_token = tokens.get("access_token") if isinstance(tokens, dict) else None + return bool(str(access_token or "").strip()) + except Exception: + return False + + def get_env_value(name: str, default=None): """Read ``name`` from ``~/.hermes/.env`` first, then ``os.environ``. @@ -32,7 +72,7 @@ def hermes_xai_user_agent() -> str: return f"Hermes-Agent/{__version__}" -def resolve_xai_http_credentials() -> Dict[str, str]: +def resolve_xai_http_credentials(*, force_refresh: bool = False) -> Dict[str, str]: """Resolve bearer credentials for direct xAI HTTP endpoints. Prefers Hermes-managed xAI OAuth credentials when available, then falls back @@ -41,26 +81,33 @@ def resolve_xai_http_credentials() -> Dict[str, str]: not just ones already exported into ``os.environ``. This keeps direct xAI endpoints (images, TTS, STT, etc.) aligned with the main runtime auth model and preserves the regression contract from PR #17140 / #17163. - """ - try: - from hermes_cli.runtime_provider import resolve_runtime_provider - runtime = resolve_runtime_provider(requested="xai-oauth") - access_token = str(runtime.get("api_key") or "").strip() - base_url = str(runtime.get("base_url") or "").strip().rstrip("/") - if access_token: - return { - "provider": "xai-oauth", - "api_key": access_token, - "base_url": base_url or "https://api.x.ai/v1", - } - except Exception: - pass + Set ``force_refresh=True`` to bypass the resolver's JWT-exp shortcut and + perform an unconditional OAuth refresh. Callers should use this only as a + reactive remediation after a server 401 (mid-window revocation, opaque + tokens where the proactive JWT check is a no-op, etc.), not as a default — + the auth-store lock is held for the duration of the refresh. + """ + if not force_refresh: + try: + from hermes_cli.runtime_provider import resolve_runtime_provider + + runtime = resolve_runtime_provider(requested="xai-oauth") + access_token = str(runtime.get("api_key") or "").strip() + base_url = str(runtime.get("base_url") or "").strip().rstrip("/") + if access_token: + return { + "provider": "xai-oauth", + "api_key": access_token, + "base_url": base_url or "https://api.x.ai/v1", + } + except Exception: + pass try: from hermes_cli.auth import resolve_xai_oauth_runtime_credentials - creds = resolve_xai_oauth_runtime_credentials() + creds = resolve_xai_oauth_runtime_credentials(force_refresh=force_refresh) access_token = str(creds.get("api_key") or "").strip() base_url = str(creds.get("base_url") or "").strip().rstrip("/") if access_token: