mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-20 13:32:30 +00:00
feat(web): add xAI Web Search provider plugin
Adds a new bundled web search provider plugin backed by xAI's agentic Web Search tool (server-side `web_search` on the Responses API). Slots in alongside the existing Firecrawl / Tavily / Exa / Brave / SearXNG / DDGS providers; opt in via `web.backend: xai` (or auto-selected by the registry's single-provider shortcut when it's the only available web provider, matching every other backend's behavior). Reuses the existing xAI HTTP credential plumbing (`tools/xai_http.py`) so it works with both `hermes auth login xai-oauth` (SuperGrok OAuth) and `XAI_API_KEY` — no new credential paths, no new env vars, no new setup-wizard prompts. The existing `xai_grok` post_setup hook handles credential collection. Reference: https://docs.x.ai/developers/tools/web-search Provider behavior ----------------- - Sends a structured prompt to Grok with `tools=[{"type": "web_search"}]` enabled and `include=["no_inline_citations"]`, then parses results from a `{"results": [...]}` JSON block (primary), falling back to `url_citation` annotations (secondary) and the top-level `citations` list (last-ditch). Annotation fallback falls through to citations when no rows are extractable, so future annotation types xAI may add don't silently mask real data. - HTTP 200 + `{"error": {...}}` envelopes (model-overload, refusal) are surfaced as failures rather than masked as success-with-empty- results. - HTTP 401 on the OAuth path triggers a single `force_refresh=True` retry — closes two gaps the resolver's proactive JWT-exp shortcut doesn't cover: opaque (non-JWT) access tokens and mid-window revocation. Env-var (`XAI_API_KEY`) credentials never retry; they can't be refreshed and an immediate retry would just burn quota. - `is_available()` is a cheap probe (env var OR auth.json read), never invokes the OAuth resolver — required by the ABC contract because it runs on every `hermes tools` repaint and at tool-registration time. - Class docstring documents the LLM-in-a-trench-coat trust model so callers piping untrusted input into `web_search` know returned URLs are model-generated and should be validated before fetching. Config (`config.yaml`): web: backend: xai xai: model: grok-4.3 # optional, defaults to grok-4.3 allowed_domains: # optional, max 5 — mutex with excluded_domains - arxiv.org excluded_domains: # optional, max 5 - example-spam.com timeout: 90 # optional, seconds Files ----- - plugins/web/xai/plugin.yaml (new) plugin manifest - plugins/web/xai/__init__.py (new) register(ctx) hook - plugins/web/xai/provider.py (new) XAIWebSearchProvider impl - tools/xai_http.py (+47) has_xai_credentials() cheap-probe helper + keyword-only force_refresh arg on resolve_xai_http_ credentials() (backwards compatible; all 9 other call sites unaffected) - tools/web_tools.py (+11) "xai" added to configured- backend set + branch in _is_backend_available() - tests/tools/test_web_providers_xai.py (new, 39 tests) covers identity, cheap-probe semantics, JSON / annotation / citations parse paths, request payload shape, error envelopes, OAuth force-refresh-on-401 retry, env-var-no-retry guard, 500-not- retried guard, refresh-returns- same-token guard, OAuth runtime resolution, and backend wiring. Tests ----- - 39 xai-suite passes - 79 sibling web-provider tests (brave-free, ddgs, searxng, base) pass - 119 cross-suite tests for other xai_http callers (transcription, x_search, tts) pass — verifies the new keyword-only arg is BC - scripts/check-windows-footguns.py: clean on all 5 modified files No edits to run_agent.py, cli.py, gateway/, toolsets, config schema, plugin core, or auth core.
This commit is contained in:
@@ -0,0 +1,14 @@
|
||||
"""xAI web search plugin — bundled, auto-loaded.
|
||||
|
||||
Mirrors the ``plugins/web/brave_free/`` layout: ``provider.py`` holds the
|
||||
provider class, ``__init__.py::register(ctx)`` registers an instance.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from plugins.web.xai.provider import XAIWebSearchProvider
|
||||
|
||||
|
||||
def register(ctx) -> None:
|
||||
"""Register the xAI Web Search provider with the plugin context."""
|
||||
ctx.register_web_search_provider(XAIWebSearchProvider())
|
||||
@@ -0,0 +1,7 @@
|
||||
name: web-xai
|
||||
version: 1.0.0
|
||||
description: "xAI Web Search — search the web via Grok's agentic web_search tool (Responses API). Requires xAI Grok OAuth (via `hermes auth`) or XAI_API_KEY (https://x.ai)."
|
||||
author: NousResearch
|
||||
kind: backend
|
||||
provides_web_providers:
|
||||
- xai
|
||||
@@ -0,0 +1,560 @@
|
||||
"""xAI Web Search — plugin form.
|
||||
|
||||
Routes ``web_search`` tool calls through xAI's agentic Web Search tool
|
||||
(server-side ``web_search`` on the Responses API). Grok runs the actual
|
||||
searching and page-browsing server-side; we ask it to return the top
|
||||
results as structured JSON so we can hand back the same
|
||||
``{title, url, description, position}`` rows every other Hermes web
|
||||
provider produces.
|
||||
|
||||
Reference: https://docs.x.ai/developers/tools/web-search
|
||||
|
||||
Config keys this provider responds to::
|
||||
|
||||
web:
|
||||
search_backend: "xai" # explicit per-capability
|
||||
backend: "xai" # shared fallback
|
||||
|
||||
Optional knobs (under ``web.xai`` in ``config.yaml``)::
|
||||
|
||||
web:
|
||||
xai:
|
||||
model: "grok-4.3" # reasoning model required by web_search
|
||||
allowed_domains: ["x.ai"] # max 5 — mutually exclusive with excluded_domains
|
||||
excluded_domains: ["bad.com"] # max 5 — mutually exclusive with allowed_domains
|
||||
timeout: 90 # seconds (default 90)
|
||||
|
||||
Auth: reuses :func:`tools.xai_http.resolve_xai_http_credentials`, which
|
||||
prefers Hermes-managed xAI Grok OAuth (via ``hermes auth``) and falls back
|
||||
to ``XAI_API_KEY`` (resolved through ``~/.hermes/.env``, then
|
||||
``os.environ``).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from agent.web_search_provider import WebSearchProvider
|
||||
from tools.xai_http import (
|
||||
has_xai_credentials,
|
||||
hermes_xai_user_agent,
|
||||
resolve_xai_http_credentials,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_MODEL = "grok-4.3"
|
||||
DEFAULT_TIMEOUT = 90
|
||||
_MAX_DOMAIN_FILTERS = 5 # xAI hard cap on allowed_domains / excluded_domains
|
||||
|
||||
# Match the JSON object Grok is asked to emit. Tolerates leading/trailing
|
||||
# prose since reasoning models occasionally narrate before the JSON block
|
||||
# even when explicitly asked not to.
|
||||
_JSON_BLOCK_RE = re.compile(r"\{[\s\S]*\}", re.MULTILINE)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Config
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _load_xai_web_config() -> Dict[str, Any]:
|
||||
"""Read ``web.xai`` from config.yaml (returns {} on miss)."""
|
||||
try:
|
||||
from hermes_cli.config import load_config
|
||||
|
||||
cfg = load_config()
|
||||
web_section = cfg.get("web") if isinstance(cfg, dict) else None
|
||||
xai_section = web_section.get("xai") if isinstance(web_section, dict) else None
|
||||
return xai_section if isinstance(xai_section, dict) else {}
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.debug("Could not load web.xai config: %s", exc)
|
||||
return {}
|
||||
|
||||
|
||||
def _coerce_domain_list(value: Any) -> List[str]:
|
||||
"""Coerce a config value to a clean list of <=5 domain strings."""
|
||||
if not isinstance(value, list):
|
||||
return []
|
||||
cleaned: List[str] = []
|
||||
for item in value:
|
||||
if isinstance(item, str) and item.strip():
|
||||
cleaned.append(item.strip())
|
||||
if len(cleaned) >= _MAX_DOMAIN_FILTERS:
|
||||
break
|
||||
return cleaned
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Provider
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class XAIWebSearchProvider(WebSearchProvider):
|
||||
"""Search-only provider backed by xAI's agentic Web Search tool.
|
||||
|
||||
Sends a structured prompt to Grok with ``tools=[{"type": "web_search"}]``
|
||||
enabled and asks it to return the top *limit* results as JSON. Falls
|
||||
back to the Responses API ``citations`` list if Grok ignores the JSON
|
||||
schema instruction (rare for grok-4.3 but cheap insurance).
|
||||
|
||||
No extract capability — pair with Firecrawl / Tavily / Exa for
|
||||
``web_extract`` if you need page content.
|
||||
|
||||
Trust model
|
||||
-----------
|
||||
Unlike index-backed providers (Brave / Tavily / Exa) which return
|
||||
verbatim search-engine results, this backend is an LLM in a trench
|
||||
coat: Grok decides which URLs to surface, generates the titles and
|
||||
descriptions itself, and is influenced by the *content of the query*.
|
||||
A maliciously crafted query (e.g. injected via untrusted upstream
|
||||
input the agent picked up) can in principle steer Grok into emitting
|
||||
attacker-chosen URLs. Callers that pipe untrusted text directly into
|
||||
``web_search`` should treat returned URLs the same way they would
|
||||
treat any model-generated link — validate before fetching.
|
||||
"""
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "xai"
|
||||
|
||||
@property
|
||||
def display_name(self) -> str:
|
||||
return "xAI Web Search (Grok)"
|
||||
|
||||
def is_available(self) -> bool:
|
||||
"""Cheap availability probe — env var OR auth-store has OAuth tokens.
|
||||
|
||||
Delegates to :func:`tools.xai_http.has_xai_credentials`, which is
|
||||
deliberately *not* the same as :func:`resolve_xai_http_credentials`:
|
||||
it never triggers OAuth token refresh or acquires the auth-store
|
||||
lock. The ABC contract requires this method to be safe to call on
|
||||
every ``hermes tools`` repaint and at tool-registration time.
|
||||
Token freshness / refresh is handled inside :meth:`search`.
|
||||
"""
|
||||
return has_xai_credentials()
|
||||
|
||||
def supports_search(self) -> bool:
|
||||
return True
|
||||
|
||||
def supports_extract(self) -> bool:
|
||||
return False
|
||||
|
||||
def supports_crawl(self) -> bool:
|
||||
return False
|
||||
|
||||
# -- Search -----------------------------------------------------------
|
||||
|
||||
def search(self, query: str, limit: int = 5) -> Dict[str, Any]:
|
||||
"""Execute a Grok-backed web search.
|
||||
|
||||
Returns ``{"success": True, "data": {"web": [{title, url, description, position}, ...]}}``
|
||||
on success, ``{"success": False, "error": str}`` on failure.
|
||||
"""
|
||||
try:
|
||||
from tools.interrupt import is_interrupted
|
||||
|
||||
if is_interrupted():
|
||||
return {"success": False, "error": "Interrupted"}
|
||||
except Exception: # noqa: BLE001 — interrupt module is best-effort
|
||||
pass
|
||||
|
||||
creds = resolve_xai_http_credentials()
|
||||
api_key = str(creds.get("api_key") or "").strip()
|
||||
base_url = str(creds.get("base_url") or "https://api.x.ai/v1").strip().rstrip("/")
|
||||
if not api_key:
|
||||
return {
|
||||
"success": False,
|
||||
"error": (
|
||||
"No xAI credentials found. Run `hermes auth` to sign in with "
|
||||
"xAI Grok OAuth, or set XAI_API_KEY."
|
||||
),
|
||||
}
|
||||
|
||||
# Clamp limit to the same range the caller (web_search_tool) accepts,
|
||||
# so we don't silently downgrade explicit limits. Grok happily
|
||||
# produces longer lists; cost scales linearly with the requested
|
||||
# count via reasoning tokens, but that's the caller's call to make.
|
||||
try:
|
||||
limit = int(limit)
|
||||
except (TypeError, ValueError):
|
||||
limit = 5
|
||||
limit = max(1, min(limit, 100))
|
||||
|
||||
cfg = _load_xai_web_config()
|
||||
model = cfg.get("model") if isinstance(cfg.get("model"), str) else DEFAULT_MODEL
|
||||
model = model.strip() or DEFAULT_MODEL
|
||||
|
||||
try:
|
||||
timeout = float(cfg.get("timeout", DEFAULT_TIMEOUT))
|
||||
except (TypeError, ValueError):
|
||||
timeout = DEFAULT_TIMEOUT
|
||||
|
||||
allowed = _coerce_domain_list(cfg.get("allowed_domains"))
|
||||
excluded = _coerce_domain_list(cfg.get("excluded_domains"))
|
||||
if allowed and excluded:
|
||||
# xAI explicitly rejects this combo — surface a clear error
|
||||
# rather than a 400 from the API.
|
||||
return {
|
||||
"success": False,
|
||||
"error": (
|
||||
"web.xai.allowed_domains and web.xai.excluded_domains "
|
||||
"cannot both be set (xAI restriction)."
|
||||
),
|
||||
}
|
||||
|
||||
web_search_tool: Dict[str, Any] = {"type": "web_search"}
|
||||
if allowed:
|
||||
web_search_tool["filters"] = {"allowed_domains": allowed}
|
||||
elif excluded:
|
||||
web_search_tool["filters"] = {"excluded_domains": excluded}
|
||||
|
||||
prompt = self._build_prompt(query, limit)
|
||||
|
||||
payload: Dict[str, Any] = {
|
||||
"model": model,
|
||||
"input": [{"role": "user", "content": prompt}],
|
||||
"tools": [web_search_tool],
|
||||
# Drop inline citation markdown — we want the JSON block clean,
|
||||
# and we read URLs from annotations / citations separately.
|
||||
"include": ["no_inline_citations"],
|
||||
}
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {api_key}",
|
||||
"Content-Type": "application/json",
|
||||
"User-Agent": hermes_xai_user_agent(),
|
||||
}
|
||||
|
||||
try:
|
||||
import httpx
|
||||
except ImportError:
|
||||
return {
|
||||
"success": False,
|
||||
"error": "httpx is not installed (required for xAI web search)",
|
||||
}
|
||||
|
||||
logger.info(
|
||||
"xAI web search via %s: '%s' (limit=%d, model=%s)",
|
||||
base_url, query, limit, model,
|
||||
)
|
||||
|
||||
# Two-attempt loop: if the first call returns 401 and our creds came
|
||||
# from the OAuth path, force-refresh the token once and retry. This
|
||||
# closes two gaps the proactive resolver check doesn't cover:
|
||||
# (1) opaque (non-JWT) access tokens — `_xai_access_token_is_expiring`
|
||||
# can't decode them and returns False, so refresh never fires
|
||||
# until the server hands us a 401.
|
||||
# (2) mid-window revocation — admin revoke, refresh-token rotation,
|
||||
# or clock skew can produce 401s on a token whose JWT `exp` claim
|
||||
# is still in the future.
|
||||
# Env-var (`XAI_API_KEY`) credentials skip the retry entirely — we
|
||||
# can't refresh those and an immediate retry would just burn quota.
|
||||
is_oauth_path = (creds.get("provider") == "xai-oauth")
|
||||
resp = None
|
||||
for attempt in range(2):
|
||||
try:
|
||||
resp = httpx.post(
|
||||
f"{base_url}/responses",
|
||||
headers=headers,
|
||||
json=payload,
|
||||
timeout=timeout,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
break
|
||||
except httpx.HTTPStatusError as exc:
|
||||
status = exc.response.status_code if exc.response is not None else 0
|
||||
if status == 401 and attempt == 0 and is_oauth_path:
|
||||
logger.info(
|
||||
"xAI web search got 401 on first attempt; forcing OAuth "
|
||||
"refresh and retrying once.",
|
||||
)
|
||||
try:
|
||||
refreshed = resolve_xai_http_credentials(force_refresh=True)
|
||||
refreshed_key = str(refreshed.get("api_key") or "").strip()
|
||||
if refreshed_key and refreshed_key != api_key:
|
||||
api_key = refreshed_key
|
||||
headers["Authorization"] = f"Bearer {api_key}"
|
||||
continue
|
||||
# Refresh returned the same (or empty) token — no point
|
||||
# in retrying. Fall through to the error return below.
|
||||
except Exception as refresh_exc: # noqa: BLE001
|
||||
logger.warning(
|
||||
"xAI web search OAuth refresh after 401 failed: %s",
|
||||
refresh_exc,
|
||||
)
|
||||
body = ""
|
||||
try:
|
||||
body = exc.response.text[:300] if exc.response is not None else ""
|
||||
except Exception:
|
||||
body = ""
|
||||
logger.warning("xAI web search HTTP %d: %s", status, body)
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"xAI web search returned HTTP {status}: {body}".rstrip(),
|
||||
}
|
||||
except httpx.RequestError as exc:
|
||||
logger.warning("xAI web search request error: %s", exc)
|
||||
return {"success": False, "error": f"Could not reach xAI: {exc}"}
|
||||
|
||||
if resp is None:
|
||||
# Defensive — both attempts somehow exited the loop without resp.
|
||||
return {"success": False, "error": "xAI web search produced no response"}
|
||||
|
||||
try:
|
||||
data = resp.json()
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning("xAI web search bad JSON: %s", exc)
|
||||
return {
|
||||
"success": False,
|
||||
"error": "Could not parse xAI Responses API reply as JSON",
|
||||
}
|
||||
|
||||
# xAI's Responses surface sometimes returns HTTP 200 with an error
|
||||
# envelope (model overloaded, content-policy refusal, etc.). Without
|
||||
# this check, ``_extract_results`` would silently produce an empty
|
||||
# list and we'd report success-with-no-rows — masking a real failure
|
||||
# the agent should see and decide whether to retry.
|
||||
api_error = data.get("error") if isinstance(data, dict) else None
|
||||
if isinstance(api_error, dict):
|
||||
err_msg = (
|
||||
api_error.get("message")
|
||||
or api_error.get("code")
|
||||
or "unknown error"
|
||||
)
|
||||
logger.warning("xAI web search returned error envelope: %s", err_msg)
|
||||
return {"success": False, "error": f"xAI returned an error: {err_msg}"}
|
||||
|
||||
web_results = self._extract_results(data, limit=limit)
|
||||
if not web_results:
|
||||
# Successful call, just no usable rows — return success with an
|
||||
# empty list so the model can decide whether to retry. Matches
|
||||
# what brave-free / exa do when the upstream API returns 0 hits.
|
||||
return {"success": True, "data": {"web": []}}
|
||||
|
||||
return {"success": True, "data": {"web": web_results}}
|
||||
|
||||
# -- Prompt + parsing -------------------------------------------------
|
||||
|
||||
@staticmethod
|
||||
def _build_prompt(query: str, limit: int) -> str:
|
||||
"""Compose the prompt that asks Grok to act as a search engine.
|
||||
|
||||
We deliberately ask for a JSON object (not bare array) so we can
|
||||
match it cheaply with ``_JSON_BLOCK_RE``; we explicitly forbid
|
||||
prose, markdown fences, and inline-citation links to keep the
|
||||
payload parseable.
|
||||
"""
|
||||
return (
|
||||
"Use the web_search tool to find current information for the query below, "
|
||||
"then respond with ONLY a single JSON object — no prose, no markdown "
|
||||
"fences, no inline citation links — matching this exact schema:\n\n"
|
||||
'{"results": [{"title": "string", "url": "string", '
|
||||
'"description": "1-2 sentence summary"}]}\n\n'
|
||||
f'Return at most {limit} results, ordered by relevance, with absolute '
|
||||
"https:// URLs. If no usable results exist, return "
|
||||
'{"results": []}.\n\n'
|
||||
f"Query: {query}"
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def _extract_results(
|
||||
cls,
|
||||
response_data: Dict[str, Any],
|
||||
*,
|
||||
limit: int,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Pull a ``[{title, url, description, position}, ...]`` list out of a
|
||||
Responses-API reply.
|
||||
|
||||
Strategy:
|
||||
|
||||
1. Walk ``output[*].content[*].text`` for ``output_text`` blocks and
|
||||
try to parse the first JSON object that has a ``results`` list.
|
||||
2. If the JSON path fails, fall back to the message annotations
|
||||
(``url_citation`` entries) — every annotation carries a URL and
|
||||
a ``title`` (citation number); we pair those URLs with surrounding
|
||||
text from the message body as a best-effort description.
|
||||
"""
|
||||
text_blocks, annotations = cls._collect_output_text(response_data)
|
||||
|
||||
# Primary path: parse the JSON object Grok was asked for.
|
||||
for block in text_blocks:
|
||||
parsed = cls._try_parse_json_results(block, limit=limit)
|
||||
if parsed:
|
||||
return parsed
|
||||
|
||||
# Secondary path: derive results from message annotations + raw text.
|
||||
# Only short-circuit when annotations actually yielded usable rows;
|
||||
# otherwise fall through to the citations list. (xAI currently only
|
||||
# emits ``url_citation`` annotations, but future annotation types
|
||||
# would silently produce an empty result set if we returned here
|
||||
# unconditionally — masking real data in ``citations``.)
|
||||
if annotations:
|
||||
joined_text = "\n".join(text_blocks)
|
||||
annotation_results = cls._results_from_annotations(
|
||||
annotations, joined_text, limit=limit,
|
||||
)
|
||||
if annotation_results:
|
||||
return annotation_results
|
||||
|
||||
# Last-ditch: raw citations list (no titles or descriptions).
|
||||
citations = response_data.get("citations") or []
|
||||
if isinstance(citations, list):
|
||||
return [
|
||||
{
|
||||
"title": "",
|
||||
"url": str(u),
|
||||
"description": "",
|
||||
"position": i + 1,
|
||||
}
|
||||
for i, u in enumerate(citations[:limit])
|
||||
if isinstance(u, str) and u.strip()
|
||||
]
|
||||
|
||||
return []
|
||||
|
||||
@staticmethod
|
||||
def _collect_output_text(
|
||||
response_data: Dict[str, Any],
|
||||
) -> tuple[List[str], List[Dict[str, Any]]]:
|
||||
"""Return (text_blocks, annotations) extracted from ``response.output``."""
|
||||
text_blocks: List[str] = []
|
||||
annotations: List[Dict[str, Any]] = []
|
||||
output = response_data.get("output")
|
||||
if not isinstance(output, list):
|
||||
return text_blocks, annotations
|
||||
|
||||
for item in output:
|
||||
if not isinstance(item, dict) or item.get("type") != "message":
|
||||
continue
|
||||
content = item.get("content")
|
||||
if not isinstance(content, list):
|
||||
continue
|
||||
for chunk in content:
|
||||
if not isinstance(chunk, dict) or chunk.get("type") != "output_text":
|
||||
continue
|
||||
text = chunk.get("text")
|
||||
if isinstance(text, str) and text.strip():
|
||||
text_blocks.append(text)
|
||||
chunk_annotations = chunk.get("annotations")
|
||||
if isinstance(chunk_annotations, list):
|
||||
for ann in chunk_annotations:
|
||||
if isinstance(ann, dict):
|
||||
annotations.append(ann)
|
||||
return text_blocks, annotations
|
||||
|
||||
@staticmethod
|
||||
def _try_parse_json_results(
|
||||
text: str,
|
||||
*,
|
||||
limit: int,
|
||||
) -> Optional[List[Dict[str, Any]]]:
|
||||
"""Parse a JSON object with a ``results`` array out of ``text``.
|
||||
|
||||
Returns the normalized result list on success, ``None`` when the
|
||||
block has no valid JSON object or no ``results`` key. Tolerates
|
||||
leading/trailing prose because reasoning models sometimes prefix a
|
||||
short narration even when told not to.
|
||||
"""
|
||||
# Try the whole string first — cheapest path when Grok obeys.
|
||||
candidates = [text]
|
||||
match = _JSON_BLOCK_RE.search(text)
|
||||
if match and match.group(0) != text:
|
||||
candidates.append(match.group(0))
|
||||
|
||||
for candidate in candidates:
|
||||
try:
|
||||
parsed = json.loads(candidate)
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
continue
|
||||
if not isinstance(parsed, dict):
|
||||
continue
|
||||
results = parsed.get("results")
|
||||
if not isinstance(results, list):
|
||||
continue
|
||||
normalized: List[Dict[str, Any]] = []
|
||||
for row in results[:limit]:
|
||||
if not isinstance(row, dict):
|
||||
continue
|
||||
url = str(row.get("url", "")).strip()
|
||||
if not url:
|
||||
continue
|
||||
normalized.append(
|
||||
{
|
||||
"title": str(row.get("title", "")).strip(),
|
||||
"url": url,
|
||||
"description": str(row.get("description", "")).strip(),
|
||||
# Renumber from the kept results, not the raw input
|
||||
# index, so a dropped malformed row doesn't leave a
|
||||
# gap in the positions handed back to the agent.
|
||||
"position": len(normalized) + 1,
|
||||
}
|
||||
)
|
||||
if normalized:
|
||||
return normalized
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _results_from_annotations(
|
||||
annotations: List[Dict[str, Any]],
|
||||
joined_text: str,
|
||||
*,
|
||||
limit: int,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Best-effort fallback when JSON parsing fails.
|
||||
|
||||
Uses each ``url_citation`` annotation's ``url`` (the citation
|
||||
title is just the integer label, so we don't surface it) and
|
||||
slices ~200 characters of surrounding text as the description.
|
||||
"""
|
||||
seen: set[str] = set()
|
||||
results: List[Dict[str, Any]] = []
|
||||
for ann in annotations:
|
||||
if ann.get("type") != "url_citation":
|
||||
continue
|
||||
url = str(ann.get("url", "")).strip()
|
||||
if not url or url in seen:
|
||||
continue
|
||||
seen.add(url)
|
||||
|
||||
description = ""
|
||||
start = ann.get("start_index")
|
||||
end = ann.get("end_index")
|
||||
if isinstance(start, int) and isinstance(end, int) and 0 <= start < end <= len(joined_text):
|
||||
window_start = max(0, start - 200)
|
||||
description = joined_text[window_start:start].strip()
|
||||
if len(description) > 200:
|
||||
description = description[-200:].strip()
|
||||
|
||||
results.append(
|
||||
{
|
||||
"title": "",
|
||||
"url": url,
|
||||
"description": description,
|
||||
"position": len(results) + 1,
|
||||
}
|
||||
)
|
||||
if len(results) >= limit:
|
||||
break
|
||||
return results
|
||||
|
||||
# -- Setup picker -----------------------------------------------------
|
||||
|
||||
def get_setup_schema(self) -> Dict[str, Any]:
|
||||
# Auth resolution is delegated to the shared ``xai_grok`` post_setup
|
||||
# hook (same one image_gen.xai and tts.xai use) so users see the
|
||||
# familiar OAuth-or-API-key prompt for every xAI service.
|
||||
return {
|
||||
"name": "xAI Web Search (Grok)",
|
||||
"badge": "paid",
|
||||
"tag": (
|
||||
"Agentic web search via Grok's web_search tool — uses xAI "
|
||||
"Grok OAuth or XAI_API_KEY."
|
||||
),
|
||||
"env_vars": [],
|
||||
"post_setup": "xai_grok",
|
||||
}
|
||||
@@ -0,0 +1,767 @@
|
||||
"""Tests for the xAI Web Search provider (plugins/web/xai/).
|
||||
|
||||
Covers:
|
||||
- XAIWebSearchProvider.is_available() — cheap probe (env var + auth.json)
|
||||
- search() — JSON happy path, annotation fallback, citations fallback, empty results
|
||||
- search() error paths — HTTP error, request error, missing creds, mutually-exclusive domain filters,
|
||||
200-OK error envelope
|
||||
- Request payload shape — model, tools list, allowed_domains/excluded_domains filters
|
||||
- OAuth credential resolution end-to-end through tools.xai_http
|
||||
- _is_backend_available("xai") integration with tools.web_tools
|
||||
- _get_backend() accepts "xai" as a configured backend
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
|
||||
def _creds(api_key: str = "xai-test-key", base_url: str = "https://api.x.ai/v1") -> dict:
|
||||
return {"provider": "xai", "api_key": api_key, "base_url": base_url}
|
||||
|
||||
|
||||
def _mock_resp(json_data, status_code: int = 200):
|
||||
m = MagicMock()
|
||||
m.status_code = status_code
|
||||
m.json.return_value = json_data
|
||||
m.raise_for_status = MagicMock()
|
||||
return m
|
||||
|
||||
|
||||
def _responses_payload(text: str, annotations=None, citations=None) -> dict:
|
||||
"""Build a minimal Responses-API reply with one message + output_text block."""
|
||||
chunk: dict = {"type": "output_text", "text": text}
|
||||
if annotations is not None:
|
||||
chunk["annotations"] = annotations
|
||||
payload: dict = {
|
||||
"output": [
|
||||
{
|
||||
"type": "message",
|
||||
"content": [chunk],
|
||||
}
|
||||
],
|
||||
}
|
||||
if citations is not None:
|
||||
payload["citations"] = citations
|
||||
return payload
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Provider identity / availability
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestXAIProviderIdentity:
|
||||
def test_provider_name(self):
|
||||
from plugins.web.xai.provider import XAIWebSearchProvider
|
||||
assert XAIWebSearchProvider().name == "xai"
|
||||
|
||||
def test_implements_web_search_provider(self):
|
||||
from agent.web_search_provider import WebSearchProvider
|
||||
from plugins.web.xai.provider import XAIWebSearchProvider
|
||||
assert issubclass(XAIWebSearchProvider, WebSearchProvider)
|
||||
|
||||
def test_supports_search_only(self):
|
||||
from plugins.web.xai.provider import XAIWebSearchProvider
|
||||
p = XAIWebSearchProvider()
|
||||
assert p.supports_search() is True
|
||||
assert p.supports_extract() is False
|
||||
assert p.supports_crawl() is False
|
||||
|
||||
def test_display_name(self):
|
||||
from plugins.web.xai.provider import XAIWebSearchProvider
|
||||
assert "Grok" in XAIWebSearchProvider().display_name
|
||||
|
||||
|
||||
class TestXAIProviderIsAvailable:
|
||||
"""``is_available()`` MUST be cheap — no network, no token refresh, no
|
||||
auth-store lock. It runs on every ``hermes tools`` repaint and at
|
||||
tool-registration time, so any I/O regression here would surface as
|
||||
visible CLI latency.
|
||||
"""
|
||||
|
||||
def test_available_via_env_var(self, monkeypatch):
|
||||
monkeypatch.setenv("XAI_API_KEY", "sk-xai-test")
|
||||
from plugins.web.xai.provider import XAIWebSearchProvider
|
||||
assert XAIWebSearchProvider().is_available() is True
|
||||
|
||||
def test_available_via_auth_store(self, monkeypatch, tmp_path):
|
||||
"""Cheap probe should detect xai-oauth tokens in ~/.hermes/auth.json
|
||||
without invoking the resolver (which can trigger refresh)."""
|
||||
monkeypatch.delenv("XAI_API_KEY", raising=False)
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
auth_path = tmp_path / "auth.json"
|
||||
auth_path.write_text(json.dumps({
|
||||
"version": 1,
|
||||
"providers": {
|
||||
"xai-oauth": {"tokens": {"access_token": "ya29.fake-access-token"}},
|
||||
},
|
||||
}))
|
||||
|
||||
from plugins.web.xai.provider import XAIWebSearchProvider
|
||||
assert XAIWebSearchProvider().is_available() is True
|
||||
|
||||
def test_unavailable_when_no_env_and_no_auth_store(self, monkeypatch, tmp_path):
|
||||
monkeypatch.delenv("XAI_API_KEY", raising=False)
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
# No auth.json written.
|
||||
from plugins.web.xai.provider import XAIWebSearchProvider
|
||||
assert XAIWebSearchProvider().is_available() is False
|
||||
|
||||
def test_unavailable_when_auth_store_has_empty_token(self, monkeypatch, tmp_path):
|
||||
monkeypatch.delenv("XAI_API_KEY", raising=False)
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
auth_path = tmp_path / "auth.json"
|
||||
auth_path.write_text(json.dumps({
|
||||
"version": 1,
|
||||
"providers": {"xai-oauth": {"tokens": {"access_token": ""}}},
|
||||
}))
|
||||
|
||||
from plugins.web.xai.provider import XAIWebSearchProvider
|
||||
assert XAIWebSearchProvider().is_available() is False
|
||||
|
||||
def test_unavailable_when_auth_store_corrupted(self, monkeypatch, tmp_path):
|
||||
"""A malformed auth.json must not crash availability scans."""
|
||||
monkeypatch.delenv("XAI_API_KEY", raising=False)
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
(tmp_path / "auth.json").write_text("not json at all }{")
|
||||
|
||||
from plugins.web.xai.provider import XAIWebSearchProvider
|
||||
assert XAIWebSearchProvider().is_available() is False
|
||||
|
||||
def test_is_available_does_not_call_resolver(self, monkeypatch):
|
||||
"""Regression guard: ``is_available()`` must NEVER touch the resolver,
|
||||
because the OAuth resolver can trigger a network refresh."""
|
||||
monkeypatch.setenv("XAI_API_KEY", "sk-xai-test")
|
||||
from plugins.web.xai import provider as xai_provider
|
||||
|
||||
with patch.object(
|
||||
xai_provider, "resolve_xai_http_credentials",
|
||||
side_effect=AssertionError("is_available must not call the resolver"),
|
||||
):
|
||||
assert xai_provider.XAIWebSearchProvider().is_available() is True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# search() happy + parse paths
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestXAIProviderSearchJSONPath:
|
||||
_GROK_JSON = json.dumps({
|
||||
"results": [
|
||||
{"title": "xAI", "url": "https://x.ai", "description": "The company."},
|
||||
{"title": "Grok docs", "url": "https://docs.x.ai", "description": "API reference."},
|
||||
{"title": "Grokipedia", "url": "https://grokipedia.com", "description": "Wiki."},
|
||||
]
|
||||
})
|
||||
|
||||
def test_happy_path_normalizes_results(self):
|
||||
from plugins.web.xai import provider as xai_provider
|
||||
|
||||
with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \
|
||||
patch.object(xai_provider, "_load_xai_web_config", return_value={}), \
|
||||
patch("httpx.post", return_value=_mock_resp(_responses_payload(self._GROK_JSON))):
|
||||
result = xai_provider.XAIWebSearchProvider().search("what is xai", limit=5)
|
||||
|
||||
assert result["success"] is True
|
||||
web = result["data"]["web"]
|
||||
assert len(web) == 3
|
||||
assert web[0] == {
|
||||
"title": "xAI",
|
||||
"url": "https://x.ai",
|
||||
"description": "The company.",
|
||||
"position": 1,
|
||||
}
|
||||
assert web[2]["position"] == 3
|
||||
|
||||
def test_limit_truncates_json_results(self):
|
||||
from plugins.web.xai import provider as xai_provider
|
||||
|
||||
with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \
|
||||
patch.object(xai_provider, "_load_xai_web_config", return_value={}), \
|
||||
patch("httpx.post", return_value=_mock_resp(_responses_payload(self._GROK_JSON))):
|
||||
result = xai_provider.XAIWebSearchProvider().search("x", limit=2)
|
||||
|
||||
assert result["success"] is True
|
||||
assert len(result["data"]["web"]) == 2
|
||||
|
||||
def test_parses_json_with_leading_prose(self):
|
||||
"""Reasoning models sometimes narrate before the JSON block; we tolerate it."""
|
||||
from plugins.web.xai import provider as xai_provider
|
||||
|
||||
text = "Here are the results:\n" + self._GROK_JSON
|
||||
with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \
|
||||
patch.object(xai_provider, "_load_xai_web_config", return_value={}), \
|
||||
patch("httpx.post", return_value=_mock_resp(_responses_payload(text))):
|
||||
result = xai_provider.XAIWebSearchProvider().search("q", limit=5)
|
||||
|
||||
assert result["success"] is True
|
||||
assert len(result["data"]["web"]) == 3
|
||||
|
||||
def test_drops_rows_without_url(self):
|
||||
from plugins.web.xai import provider as xai_provider
|
||||
|
||||
bad_json = json.dumps({
|
||||
"results": [
|
||||
{"title": "no url", "description": "skip me"},
|
||||
{"title": "good", "url": "https://ok.com", "description": "keep"},
|
||||
]
|
||||
})
|
||||
with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \
|
||||
patch.object(xai_provider, "_load_xai_web_config", return_value={}), \
|
||||
patch("httpx.post", return_value=_mock_resp(_responses_payload(bad_json))):
|
||||
result = xai_provider.XAIWebSearchProvider().search("q", limit=5)
|
||||
|
||||
assert result["success"] is True
|
||||
web = result["data"]["web"]
|
||||
assert len(web) == 1
|
||||
assert web[0]["url"] == "https://ok.com"
|
||||
assert web[0]["position"] == 1
|
||||
|
||||
|
||||
class TestXAIProviderSearchFallbacks:
|
||||
def test_falls_back_to_annotations_when_json_missing(self):
|
||||
"""If Grok ignores the JSON instruction, derive results from url_citation annotations."""
|
||||
from plugins.web.xai import provider as xai_provider
|
||||
|
||||
body = "xAI is an AI company founded in 2023. They make Grok."
|
||||
annotations = [
|
||||
{
|
||||
"type": "url_citation",
|
||||
"url": "https://x.ai/about",
|
||||
"title": "1",
|
||||
"start_index": 4,
|
||||
"end_index": 9,
|
||||
},
|
||||
{
|
||||
"type": "url_citation",
|
||||
"url": "https://docs.x.ai",
|
||||
"title": "2",
|
||||
"start_index": 47,
|
||||
"end_index": 52,
|
||||
},
|
||||
]
|
||||
with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \
|
||||
patch.object(xai_provider, "_load_xai_web_config", return_value={}), \
|
||||
patch("httpx.post", return_value=_mock_resp(_responses_payload(body, annotations=annotations))):
|
||||
result = xai_provider.XAIWebSearchProvider().search("xai", limit=5)
|
||||
|
||||
assert result["success"] is True
|
||||
urls = [r["url"] for r in result["data"]["web"]]
|
||||
assert urls == ["https://x.ai/about", "https://docs.x.ai"]
|
||||
assert result["data"]["web"][0]["position"] == 1
|
||||
assert result["data"]["web"][1]["position"] == 2
|
||||
|
||||
def test_falls_back_to_citations_list(self):
|
||||
"""If no JSON and no annotations, derive from top-level citations list."""
|
||||
from plugins.web.xai import provider as xai_provider
|
||||
|
||||
payload = _responses_payload("free-form narration", citations=["https://a.com", "https://b.com"])
|
||||
with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \
|
||||
patch.object(xai_provider, "_load_xai_web_config", return_value={}), \
|
||||
patch("httpx.post", return_value=_mock_resp(payload)):
|
||||
result = xai_provider.XAIWebSearchProvider().search("q", limit=5)
|
||||
|
||||
assert result["success"] is True
|
||||
urls = [r["url"] for r in result["data"]["web"]]
|
||||
assert urls == ["https://a.com", "https://b.com"]
|
||||
|
||||
def test_annotations_without_url_citations_fall_through_to_citations(self):
|
||||
"""When annotations exist but none are url_citation type (e.g. future
|
||||
annotation types xAI may add), the citations list MUST still be
|
||||
consulted — otherwise we'd silently report success-with-no-rows
|
||||
and mask real data the API provided.
|
||||
"""
|
||||
from plugins.web.xai import provider as xai_provider
|
||||
|
||||
body = "Some narration about xAI."
|
||||
# Non-url_citation annotations only — the fallback shouldn't extract
|
||||
# any URLs from them, and must defer to the citations list below.
|
||||
annotations = [
|
||||
{"type": "future_citation_type", "url": "https://ignored.example", "title": "x"},
|
||||
]
|
||||
payload = _responses_payload(
|
||||
body,
|
||||
annotations=annotations,
|
||||
citations=["https://real-fallback.com"],
|
||||
)
|
||||
with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \
|
||||
patch.object(xai_provider, "_load_xai_web_config", return_value={}), \
|
||||
patch("httpx.post", return_value=_mock_resp(payload)):
|
||||
result = xai_provider.XAIWebSearchProvider().search("q", limit=5)
|
||||
|
||||
assert result["success"] is True
|
||||
urls = [r["url"] for r in result["data"]["web"]]
|
||||
assert urls == ["https://real-fallback.com"]
|
||||
|
||||
def test_empty_response_returns_empty_success(self):
|
||||
from plugins.web.xai import provider as xai_provider
|
||||
|
||||
payload = _responses_payload("", citations=[])
|
||||
with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \
|
||||
patch.object(xai_provider, "_load_xai_web_config", return_value={}), \
|
||||
patch("httpx.post", return_value=_mock_resp(payload)):
|
||||
result = xai_provider.XAIWebSearchProvider().search("q", limit=5)
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["data"]["web"] == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Request payload shape
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestXAIProviderRequestShape:
|
||||
def test_posts_to_responses_endpoint_with_bearer_token(self):
|
||||
from plugins.web.xai import provider as xai_provider
|
||||
|
||||
captured: dict = {}
|
||||
|
||||
def fake_post(url, **kwargs):
|
||||
captured["url"] = url
|
||||
captured["headers"] = kwargs.get("headers", {})
|
||||
captured["json"] = kwargs.get("json", {})
|
||||
return _mock_resp(_responses_payload(json.dumps({"results": []})))
|
||||
|
||||
with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds("secret-key")), \
|
||||
patch.object(xai_provider, "_load_xai_web_config", return_value={}), \
|
||||
patch("httpx.post", side_effect=fake_post):
|
||||
xai_provider.XAIWebSearchProvider().search("q", limit=5)
|
||||
|
||||
assert captured["url"] == "https://api.x.ai/v1/responses"
|
||||
assert captured["headers"].get("Authorization") == "Bearer secret-key"
|
||||
body = captured["json"]
|
||||
# Assert against the module constant rather than the literal value,
|
||||
# so renaming DEFAULT_MODEL (when xAI deprecates grok-4.3) doesn't
|
||||
# turn this into a change-detector failure.
|
||||
assert body["model"] == xai_provider.DEFAULT_MODEL
|
||||
assert body["tools"] == [{"type": "web_search"}]
|
||||
assert body["input"][0]["role"] == "user"
|
||||
# No-inline-citations is opt-in via `include` per xAI Responses docs.
|
||||
assert "no_inline_citations" in body.get("include", [])
|
||||
|
||||
def test_honors_configured_model(self):
|
||||
from plugins.web.xai import provider as xai_provider
|
||||
|
||||
captured: dict = {}
|
||||
|
||||
def fake_post(url, **kwargs):
|
||||
captured["json"] = kwargs.get("json", {})
|
||||
return _mock_resp(_responses_payload(json.dumps({"results": []})))
|
||||
|
||||
with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \
|
||||
patch.object(xai_provider, "_load_xai_web_config", return_value={"model": "grok-4.3-fast"}), \
|
||||
patch("httpx.post", side_effect=fake_post):
|
||||
xai_provider.XAIWebSearchProvider().search("q", limit=5)
|
||||
|
||||
assert captured["json"]["model"] == "grok-4.3-fast"
|
||||
|
||||
def test_allowed_domains_passes_through_as_filters(self):
|
||||
from plugins.web.xai import provider as xai_provider
|
||||
|
||||
captured: dict = {}
|
||||
|
||||
def fake_post(url, **kwargs):
|
||||
captured["json"] = kwargs.get("json", {})
|
||||
return _mock_resp(_responses_payload(json.dumps({"results": []})))
|
||||
|
||||
cfg = {"allowed_domains": ["x.ai", "grokipedia.com"]}
|
||||
with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \
|
||||
patch.object(xai_provider, "_load_xai_web_config", return_value=cfg), \
|
||||
patch("httpx.post", side_effect=fake_post):
|
||||
xai_provider.XAIWebSearchProvider().search("q", limit=5)
|
||||
|
||||
tools = captured["json"]["tools"]
|
||||
assert tools == [{
|
||||
"type": "web_search",
|
||||
"filters": {"allowed_domains": ["x.ai", "grokipedia.com"]},
|
||||
}]
|
||||
|
||||
def test_excluded_domains_passes_through_as_filters(self):
|
||||
from plugins.web.xai import provider as xai_provider
|
||||
|
||||
captured: dict = {}
|
||||
|
||||
def fake_post(url, **kwargs):
|
||||
captured["json"] = kwargs.get("json", {})
|
||||
return _mock_resp(_responses_payload(json.dumps({"results": []})))
|
||||
|
||||
cfg = {"excluded_domains": ["spam.com"]}
|
||||
with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \
|
||||
patch.object(xai_provider, "_load_xai_web_config", return_value=cfg), \
|
||||
patch("httpx.post", side_effect=fake_post):
|
||||
xai_provider.XAIWebSearchProvider().search("q", limit=5)
|
||||
|
||||
tools = captured["json"]["tools"]
|
||||
assert tools == [{
|
||||
"type": "web_search",
|
||||
"filters": {"excluded_domains": ["spam.com"]},
|
||||
}]
|
||||
|
||||
def test_allowed_domains_capped_at_five(self):
|
||||
"""xAI caps domain filters at 5; we trim silently to avoid 400s."""
|
||||
from plugins.web.xai import provider as xai_provider
|
||||
|
||||
captured: dict = {}
|
||||
|
||||
def fake_post(url, **kwargs):
|
||||
captured["json"] = kwargs.get("json", {})
|
||||
return _mock_resp(_responses_payload(json.dumps({"results": []})))
|
||||
|
||||
cfg = {"allowed_domains": [f"d{i}.com" for i in range(10)]}
|
||||
with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \
|
||||
patch.object(xai_provider, "_load_xai_web_config", return_value=cfg), \
|
||||
patch("httpx.post", side_effect=fake_post):
|
||||
xai_provider.XAIWebSearchProvider().search("q", limit=5)
|
||||
|
||||
domains = captured["json"]["tools"][0]["filters"]["allowed_domains"]
|
||||
assert len(domains) == 5
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Error paths
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestXAIProviderSearchErrors:
|
||||
def test_missing_creds_returns_failure(self):
|
||||
from plugins.web.xai import provider as xai_provider
|
||||
|
||||
with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds("")):
|
||||
result = xai_provider.XAIWebSearchProvider().search("q", limit=5)
|
||||
|
||||
assert result["success"] is False
|
||||
assert "xAI" in result["error"]
|
||||
|
||||
def test_mutually_exclusive_domain_filters_rejected_locally(self):
|
||||
from plugins.web.xai import provider as xai_provider
|
||||
|
||||
cfg = {"allowed_domains": ["a.com"], "excluded_domains": ["b.com"]}
|
||||
with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \
|
||||
patch.object(xai_provider, "_load_xai_web_config", return_value=cfg), \
|
||||
patch("httpx.post") as posted:
|
||||
result = xai_provider.XAIWebSearchProvider().search("q", limit=5)
|
||||
|
||||
assert result["success"] is False
|
||||
assert "cannot both be set" in result["error"]
|
||||
posted.assert_not_called()
|
||||
|
||||
def test_http_error_returns_failure(self):
|
||||
import httpx
|
||||
from plugins.web.xai import provider as xai_provider
|
||||
|
||||
bad = MagicMock()
|
||||
bad.status_code = 429
|
||||
bad.text = "rate limited"
|
||||
err = httpx.HTTPStatusError("429", request=MagicMock(), response=bad)
|
||||
|
||||
with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \
|
||||
patch.object(xai_provider, "_load_xai_web_config", return_value={}), \
|
||||
patch("httpx.post", side_effect=err):
|
||||
result = xai_provider.XAIWebSearchProvider().search("q", limit=5)
|
||||
|
||||
assert result["success"] is False
|
||||
assert "429" in result["error"]
|
||||
|
||||
def test_request_error_returns_failure(self):
|
||||
import httpx
|
||||
from plugins.web.xai import provider as xai_provider
|
||||
|
||||
with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \
|
||||
patch.object(xai_provider, "_load_xai_web_config", return_value={}), \
|
||||
patch("httpx.post", side_effect=httpx.RequestError("boom")):
|
||||
result = xai_provider.XAIWebSearchProvider().search("q", limit=5)
|
||||
|
||||
assert result["success"] is False
|
||||
assert "boom" in result["error"] or "xAI" in result["error"]
|
||||
|
||||
def test_bad_json_response_returns_failure(self):
|
||||
from plugins.web.xai import provider as xai_provider
|
||||
|
||||
bad = MagicMock()
|
||||
bad.status_code = 200
|
||||
bad.raise_for_status = MagicMock()
|
||||
bad.json.side_effect = ValueError("not json")
|
||||
|
||||
with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \
|
||||
patch.object(xai_provider, "_load_xai_web_config", return_value={}), \
|
||||
patch("httpx.post", return_value=bad):
|
||||
result = xai_provider.XAIWebSearchProvider().search("q", limit=5)
|
||||
|
||||
assert result["success"] is False
|
||||
assert "JSON" in result["error"]
|
||||
|
||||
def test_401_on_oauth_path_triggers_force_refresh_and_retry(self):
|
||||
"""OAuth credentials → 401 must force-refresh and retry once.
|
||||
|
||||
Closes the two-gap scenario the resolver's JWT-exp shortcut doesn't
|
||||
cover: opaque (non-JWT) tokens and mid-window revocation. We expect
|
||||
``httpx.post`` to be called twice with two different Bearer tokens.
|
||||
"""
|
||||
import httpx
|
||||
from plugins.web.xai import provider as xai_provider
|
||||
|
||||
bad = MagicMock()
|
||||
bad.status_code = 401
|
||||
bad.text = "Unauthorized"
|
||||
unauthorized = httpx.HTTPStatusError("401", request=MagicMock(), response=bad)
|
||||
|
||||
calls = {"posts": [], "refresh_count": 0}
|
||||
|
||||
def fake_post(url, **kwargs):
|
||||
calls["posts"].append(kwargs.get("headers", {}).get("Authorization"))
|
||||
if len(calls["posts"]) == 1:
|
||||
raise unauthorized
|
||||
return _mock_resp(_responses_payload(json.dumps({"results": []})))
|
||||
|
||||
def fake_resolve(*, force_refresh=False):
|
||||
if force_refresh:
|
||||
calls["refresh_count"] += 1
|
||||
return {
|
||||
"provider": "xai-oauth",
|
||||
"api_key": "fresh-after-refresh",
|
||||
"base_url": "https://api.x.ai/v1",
|
||||
}
|
||||
return {
|
||||
"provider": "xai-oauth",
|
||||
"api_key": "stale-token",
|
||||
"base_url": "https://api.x.ai/v1",
|
||||
}
|
||||
|
||||
with patch.object(xai_provider, "resolve_xai_http_credentials", side_effect=fake_resolve), \
|
||||
patch.object(xai_provider, "_load_xai_web_config", return_value={}), \
|
||||
patch("httpx.post", side_effect=fake_post):
|
||||
result = xai_provider.XAIWebSearchProvider().search("q", limit=5)
|
||||
|
||||
assert result["success"] is True
|
||||
assert calls["refresh_count"] == 1
|
||||
assert calls["posts"] == ["Bearer stale-token", "Bearer fresh-after-refresh"]
|
||||
|
||||
def test_401_on_env_var_path_does_not_retry(self):
|
||||
"""Env-var (XAI_API_KEY) creds can't be refreshed — must not retry."""
|
||||
import httpx
|
||||
from plugins.web.xai import provider as xai_provider
|
||||
|
||||
bad = MagicMock()
|
||||
bad.status_code = 401
|
||||
bad.text = "Unauthorized"
|
||||
unauthorized = httpx.HTTPStatusError("401", request=MagicMock(), response=bad)
|
||||
|
||||
calls = {"posts": 0, "refreshed": False}
|
||||
|
||||
def fake_post(url, **kwargs):
|
||||
calls["posts"] += 1
|
||||
raise unauthorized
|
||||
|
||||
def fake_resolve(*, force_refresh=False):
|
||||
if force_refresh:
|
||||
calls["refreshed"] = True
|
||||
# provider=="xai" signals env-var path; retry must be skipped.
|
||||
return {"provider": "xai", "api_key": "sk-env-var-key", "base_url": "https://api.x.ai/v1"}
|
||||
|
||||
with patch.object(xai_provider, "resolve_xai_http_credentials", side_effect=fake_resolve), \
|
||||
patch.object(xai_provider, "_load_xai_web_config", return_value={}), \
|
||||
patch("httpx.post", side_effect=fake_post):
|
||||
result = xai_provider.XAIWebSearchProvider().search("q", limit=5)
|
||||
|
||||
assert result["success"] is False
|
||||
assert "401" in result["error"]
|
||||
assert calls["posts"] == 1
|
||||
assert calls["refreshed"] is False
|
||||
|
||||
def test_401_retry_gives_up_when_refresh_returns_same_token(self):
|
||||
"""If the force-refresh returns the same token (refresh-token also
|
||||
dead), don't loop — surface the 401 to the caller."""
|
||||
import httpx
|
||||
from plugins.web.xai import provider as xai_provider
|
||||
|
||||
bad = MagicMock()
|
||||
bad.status_code = 401
|
||||
bad.text = "Unauthorized"
|
||||
unauthorized = httpx.HTTPStatusError("401", request=MagicMock(), response=bad)
|
||||
|
||||
calls = {"posts": 0, "refresh_count": 0}
|
||||
|
||||
def fake_post(url, **kwargs):
|
||||
calls["posts"] += 1
|
||||
raise unauthorized
|
||||
|
||||
def fake_resolve(*, force_refresh=False):
|
||||
if force_refresh:
|
||||
calls["refresh_count"] += 1
|
||||
return {
|
||||
"provider": "xai-oauth",
|
||||
"api_key": "same-dead-token",
|
||||
"base_url": "https://api.x.ai/v1",
|
||||
}
|
||||
|
||||
with patch.object(xai_provider, "resolve_xai_http_credentials", side_effect=fake_resolve), \
|
||||
patch.object(xai_provider, "_load_xai_web_config", return_value={}), \
|
||||
patch("httpx.post", side_effect=fake_post):
|
||||
result = xai_provider.XAIWebSearchProvider().search("q", limit=5)
|
||||
|
||||
assert result["success"] is False
|
||||
assert "401" in result["error"]
|
||||
# One post, one force-refresh attempt, no second post.
|
||||
assert calls["posts"] == 1
|
||||
assert calls["refresh_count"] == 1
|
||||
|
||||
def test_non_401_http_error_is_not_retried(self):
|
||||
"""Only 401 is retryable — 429 / 500 / 503 must fail fast so the
|
||||
agent (or upstream rate-limiter) decides what to do."""
|
||||
import httpx
|
||||
from plugins.web.xai import provider as xai_provider
|
||||
|
||||
bad = MagicMock()
|
||||
bad.status_code = 500
|
||||
bad.text = "internal error"
|
||||
err = httpx.HTTPStatusError("500", request=MagicMock(), response=bad)
|
||||
|
||||
calls = {"posts": 0, "refreshed": False}
|
||||
|
||||
def fake_post(url, **kwargs):
|
||||
calls["posts"] += 1
|
||||
raise err
|
||||
|
||||
def fake_resolve(*, force_refresh=False):
|
||||
if force_refresh:
|
||||
calls["refreshed"] = True
|
||||
return {"provider": "xai-oauth", "api_key": "tok", "base_url": "https://api.x.ai/v1"}
|
||||
|
||||
with patch.object(xai_provider, "resolve_xai_http_credentials", side_effect=fake_resolve), \
|
||||
patch.object(xai_provider, "_load_xai_web_config", return_value={}), \
|
||||
patch("httpx.post", side_effect=fake_post):
|
||||
result = xai_provider.XAIWebSearchProvider().search("q", limit=5)
|
||||
|
||||
assert result["success"] is False
|
||||
assert "500" in result["error"]
|
||||
assert calls["posts"] == 1
|
||||
assert calls["refreshed"] is False
|
||||
|
||||
def test_http_200_with_error_envelope_surfaces_failure(self):
|
||||
"""xAI sometimes returns 200 with ``{"error": {...}}`` (model
|
||||
overloaded, refusal, etc.). Must be surfaced as a failure rather
|
||||
than silently masked as success-with-empty-results.
|
||||
"""
|
||||
from plugins.web.xai import provider as xai_provider
|
||||
|
||||
payload = {"error": {"message": "model overloaded", "type": "server_error"}}
|
||||
with patch.object(xai_provider, "resolve_xai_http_credentials", return_value=_creds()), \
|
||||
patch.object(xai_provider, "_load_xai_web_config", return_value={}), \
|
||||
patch("httpx.post", return_value=_mock_resp(payload)):
|
||||
result = xai_provider.XAIWebSearchProvider().search("q", limit=5)
|
||||
|
||||
assert result["success"] is False
|
||||
assert "model overloaded" in result["error"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Integration with tools/web_tools.py backend wiring
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestXAIBackendWiring:
|
||||
def test_is_backend_available_true_via_env_var(self, monkeypatch):
|
||||
from tools import web_tools
|
||||
|
||||
monkeypatch.setenv("XAI_API_KEY", "sk-xai-test")
|
||||
assert web_tools._is_backend_available("xai") is True
|
||||
|
||||
def test_is_backend_available_false_when_no_creds(self, monkeypatch, tmp_path):
|
||||
from tools import web_tools
|
||||
|
||||
monkeypatch.delenv("XAI_API_KEY", raising=False)
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
assert web_tools._is_backend_available("xai") is False
|
||||
|
||||
def test_is_backend_available_does_not_call_resolver(self, monkeypatch):
|
||||
"""Regression guard — `_is_backend_available` runs on every web_search
|
||||
dispatch and every `hermes tools` repaint. It must not invoke the
|
||||
OAuth resolver (which can trigger a network refresh)."""
|
||||
from tools import web_tools
|
||||
|
||||
monkeypatch.setenv("XAI_API_KEY", "sk-xai-test")
|
||||
with patch(
|
||||
"tools.xai_http.resolve_xai_http_credentials",
|
||||
side_effect=AssertionError("must not call resolver"),
|
||||
):
|
||||
assert web_tools._is_backend_available("xai") is True
|
||||
|
||||
def test_configured_backend_xai_accepted(self, monkeypatch):
|
||||
from tools import web_tools
|
||||
monkeypatch.setattr(web_tools, "_load_web_config", lambda: {"backend": "xai"})
|
||||
assert web_tools._get_backend() == "xai"
|
||||
|
||||
def test_xai_not_in_legacy_backend_candidate_chain(self, monkeypatch):
|
||||
"""The hardcoded ``backend_candidates`` tuple in ``_get_backend()``
|
||||
does not include xAI — by design, since the no-config legacy
|
||||
chain is for users who set env vars but never ran ``hermes tools``,
|
||||
and we don't want a stray ``XAI_API_KEY`` (perhaps set for chat
|
||||
inference) to silently re-route web_search through Grok.
|
||||
|
||||
Note: this does NOT prevent the registry's single-provider
|
||||
shortcut (``agent.web_search_registry._resolve``) from selecting
|
||||
xAI when it's the only available web provider. That path is the
|
||||
normal "pick the one provider the user actually configured"
|
||||
behavior shared by every other backend.
|
||||
"""
|
||||
from tools import web_tools
|
||||
|
||||
monkeypatch.setattr(web_tools, "_load_web_config", lambda: {})
|
||||
for key in (
|
||||
"FIRECRAWL_API_KEY", "FIRECRAWL_API_URL", "PARALLEL_API_KEY",
|
||||
"TAVILY_API_KEY", "EXA_API_KEY", "SEARXNG_URL", "BRAVE_SEARCH_API_KEY",
|
||||
):
|
||||
monkeypatch.delenv(key, raising=False)
|
||||
monkeypatch.setenv("XAI_API_KEY", "xai-test-key")
|
||||
monkeypatch.setattr(web_tools, "_is_tool_gateway_ready", lambda: False)
|
||||
monkeypatch.setattr(web_tools, "_ddgs_package_importable", lambda: False)
|
||||
assert web_tools._get_backend() != "xai"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# OAuth credential resolution (end-to-end through tools.xai_http)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestXAIProviderOAuthPath:
|
||||
"""Verifies the provider works when credentials come from the OAuth
|
||||
runtime resolver (``hermes auth`` sign-in) rather than an env-var key.
|
||||
Patches at the ``hermes_cli.runtime_provider.resolve_runtime_provider``
|
||||
boundary so the full ``tools.xai_http.resolve_xai_http_credentials``
|
||||
chain is exercised end-to-end.
|
||||
"""
|
||||
|
||||
def test_search_uses_oauth_bearer_token_and_base_url(self, monkeypatch):
|
||||
from plugins.web.xai import provider as xai_provider
|
||||
|
||||
# Force the env-var fallback to fail so resolution must go via OAuth.
|
||||
monkeypatch.delenv("XAI_API_KEY", raising=False)
|
||||
|
||||
oauth_runtime = {
|
||||
"provider": "xai-oauth",
|
||||
"api_mode": "codex_responses",
|
||||
"base_url": "https://api.x.ai/v1",
|
||||
"api_key": "ya29.fake-oauth-access-token",
|
||||
"source": "hermes-auth-store",
|
||||
}
|
||||
|
||||
captured: dict = {}
|
||||
|
||||
def fake_post(url, **kwargs):
|
||||
captured["url"] = url
|
||||
captured["headers"] = kwargs.get("headers", {})
|
||||
return _mock_resp(_responses_payload(json.dumps({"results": []})))
|
||||
|
||||
with patch(
|
||||
"hermes_cli.runtime_provider.resolve_runtime_provider",
|
||||
return_value=oauth_runtime,
|
||||
), patch.object(xai_provider, "_load_xai_web_config", return_value={}), \
|
||||
patch("httpx.post", side_effect=fake_post):
|
||||
result = xai_provider.XAIWebSearchProvider().search("q", limit=3)
|
||||
|
||||
assert result["success"] is True
|
||||
assert captured["url"] == "https://api.x.ai/v1/responses"
|
||||
assert captured["headers"].get("Authorization") == "Bearer ya29.fake-oauth-access-token"
|
||||
+11
-1
@@ -140,7 +140,7 @@ def _get_backend() -> str:
|
||||
keys manually without running setup.
|
||||
"""
|
||||
configured = (_load_web_config().get("backend") or "").lower().strip()
|
||||
if configured in {"parallel", "firecrawl", "tavily", "exa", "searxng", "brave-free", "ddgs"}:
|
||||
if configured in {"parallel", "firecrawl", "tavily", "exa", "searxng", "brave-free", "ddgs", "xai"}:
|
||||
return configured
|
||||
|
||||
# Fallback for manual / legacy config — pick the highest-priority
|
||||
@@ -218,6 +218,16 @@ def _is_backend_available(backend: str) -> bool:
|
||||
return _has_env("BRAVE_SEARCH_API_KEY")
|
||||
if backend == "ddgs":
|
||||
return _ddgs_package_importable()
|
||||
if backend == "xai":
|
||||
# Cheap probe — env var OR auth.json has OAuth tokens. Must not
|
||||
# call resolve_xai_http_credentials() here because the OAuth path
|
||||
# can trigger a network token refresh, and _is_backend_available
|
||||
# runs on every web_search dispatch + every `hermes tools` repaint.
|
||||
try:
|
||||
from tools.xai_http import has_xai_credentials
|
||||
return has_xai_credentials()
|
||||
except Exception:
|
||||
return False
|
||||
return False
|
||||
|
||||
|
||||
|
||||
+63
-16
@@ -2,9 +2,49 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from typing import Dict
|
||||
|
||||
|
||||
def has_xai_credentials() -> bool:
|
||||
"""Cheap probe — return True when xAI credentials are *likely* usable.
|
||||
|
||||
Deliberately avoids :func:`resolve_xai_http_credentials` so callers in
|
||||
hot-paint paths (``hermes tools`` repaint, tool-registration scans,
|
||||
``WebSearchProvider.is_available()``) don't incur disk locks or — in
|
||||
the OAuth path — a network token refresh. The ABC contract on
|
||||
:meth:`agent.web_search_provider.WebSearchProvider.is_available`
|
||||
explicitly forbids network calls for exactly this reason.
|
||||
|
||||
Resolution order, fast-to-slow:
|
||||
|
||||
1. ``XAI_API_KEY`` env var (cheapest; covers explicit-key users).
|
||||
2. ``~/.hermes/auth.json`` has a non-empty ``providers.xai-oauth.tokens.access_token``
|
||||
(single file read, no expiry check, no refresh).
|
||||
|
||||
Returns False on any exception so a corrupted auth store can't block
|
||||
other availability scans. Truthful refresh + expiry handling happens
|
||||
in ``search()`` (or whichever caller actually makes the request).
|
||||
"""
|
||||
if os.environ.get("XAI_API_KEY", "").strip():
|
||||
return True
|
||||
try:
|
||||
from hermes_constants import get_hermes_home
|
||||
|
||||
auth_path = get_hermes_home() / "auth.json"
|
||||
if not auth_path.exists():
|
||||
return False
|
||||
store = json.loads(auth_path.read_text())
|
||||
providers = store.get("providers") if isinstance(store, dict) else None
|
||||
xai_state = providers.get("xai-oauth") if isinstance(providers, dict) else None
|
||||
tokens = xai_state.get("tokens") if isinstance(xai_state, dict) else None
|
||||
access_token = tokens.get("access_token") if isinstance(tokens, dict) else None
|
||||
return bool(str(access_token or "").strip())
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def get_env_value(name: str, default=None):
|
||||
"""Read ``name`` from ``~/.hermes/.env`` first, then ``os.environ``.
|
||||
|
||||
@@ -32,7 +72,7 @@ def hermes_xai_user_agent() -> str:
|
||||
return f"Hermes-Agent/{__version__}"
|
||||
|
||||
|
||||
def resolve_xai_http_credentials() -> Dict[str, str]:
|
||||
def resolve_xai_http_credentials(*, force_refresh: bool = False) -> Dict[str, str]:
|
||||
"""Resolve bearer credentials for direct xAI HTTP endpoints.
|
||||
|
||||
Prefers Hermes-managed xAI OAuth credentials when available, then falls back
|
||||
@@ -41,26 +81,33 @@ def resolve_xai_http_credentials() -> Dict[str, str]:
|
||||
not just ones already exported into ``os.environ``. This keeps direct xAI
|
||||
endpoints (images, TTS, STT, etc.) aligned with the main runtime auth model
|
||||
and preserves the regression contract from PR #17140 / #17163.
|
||||
"""
|
||||
try:
|
||||
from hermes_cli.runtime_provider import resolve_runtime_provider
|
||||
|
||||
runtime = resolve_runtime_provider(requested="xai-oauth")
|
||||
access_token = str(runtime.get("api_key") or "").strip()
|
||||
base_url = str(runtime.get("base_url") or "").strip().rstrip("/")
|
||||
if access_token:
|
||||
return {
|
||||
"provider": "xai-oauth",
|
||||
"api_key": access_token,
|
||||
"base_url": base_url or "https://api.x.ai/v1",
|
||||
}
|
||||
except Exception:
|
||||
pass
|
||||
Set ``force_refresh=True`` to bypass the resolver's JWT-exp shortcut and
|
||||
perform an unconditional OAuth refresh. Callers should use this only as a
|
||||
reactive remediation after a server 401 (mid-window revocation, opaque
|
||||
tokens where the proactive JWT check is a no-op, etc.), not as a default —
|
||||
the auth-store lock is held for the duration of the refresh.
|
||||
"""
|
||||
if not force_refresh:
|
||||
try:
|
||||
from hermes_cli.runtime_provider import resolve_runtime_provider
|
||||
|
||||
runtime = resolve_runtime_provider(requested="xai-oauth")
|
||||
access_token = str(runtime.get("api_key") or "").strip()
|
||||
base_url = str(runtime.get("base_url") or "").strip().rstrip("/")
|
||||
if access_token:
|
||||
return {
|
||||
"provider": "xai-oauth",
|
||||
"api_key": access_token,
|
||||
"base_url": base_url or "https://api.x.ai/v1",
|
||||
}
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
from hermes_cli.auth import resolve_xai_oauth_runtime_credentials
|
||||
|
||||
creds = resolve_xai_oauth_runtime_credentials()
|
||||
creds = resolve_xai_oauth_runtime_credentials(force_refresh=force_refresh)
|
||||
access_token = str(creds.get("api_key") or "").strip()
|
||||
base_url = str(creds.get("base_url") or "").strip().rstrip("/")
|
||||
if access_token:
|
||||
|
||||
Reference in New Issue
Block a user