feat(claude): add camofox_mcp stdio MCP server

Python MCP server wrapping the Camofox browser HTTP API on lthn.sh.
Exposes navigate/read_page/screenshot/click/fill/close_tab as MCP tools
so any Claude Code session can drive Camofox via `claude mcp add camofox`.

Closes tasks.lthn.sh/view.php?id=77
Co-authored-by: Codex <noreply@openai.com>

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Snider 2026-04-23 17:43:05 +01:00
parent 124fa6e6d7
commit b5783a16f2
6 changed files with 997 additions and 0 deletions

View file

@ -0,0 +1,39 @@
<!-- SPDX-License-Identifier: EUPL-1.2 -->
# camofox-mcp
`camofox-mcp` is a stdio MCP server that wraps the Camofox browser HTTP API for Claude Code.
## Install
Local editable install:
```bash
cd claude/camofox_mcp
pip install -e .
```
Direct git install:
```bash
pip install "git+https://github.com/dAppCore/core-agent.git#subdirectory=claude/camofox_mcp"
```
## Claude Code
```bash
claude mcp add camofox -- camofox-mcp --camofox-url=http://localhost:8099 --api-key=$CAMOFOX_API_KEY
```
If `--api-key` is omitted, the server will read `CAMOFOX_API_KEY` from the environment.
## Tools
- `navigate(url)` opens a new tab and returns `{tab_id, status}`
- `read_page(tab_id)` returns `{text, url, title}`
- `screenshot(tab_id)` returns `{image_b64}`
- `click(tab_id, selector)` returns `{ok}`
- `fill(tab_id, selector, value)` returns `{ok}`
- `close_tab(tab_id)` returns `{ok}`
The server prefers the official Python `mcp` SDK when it is importable. If that package is unavailable at runtime, it falls back to a small stdio JSON-RPC MCP implementation that supports `initialize`, `tools/list`, and `tools/call`.

View file

@ -0,0 +1,7 @@
# SPDX-License-Identifier: EUPL-1.2
"""Camofox MCP server package."""
__all__ = ["__version__"]
__version__ = "0.1.0"

View file

@ -0,0 +1,26 @@
# SPDX-License-Identifier: EUPL-1.2
[build-system]
requires = ["setuptools>=69", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "camofox-mcp"
version = "0.1.0"
description = "MCP stdio server exposing the Camofox browser HTTP API"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"mcp>=1.0.0",
"httpx>=0.28.0",
"pydantic>=2.0.0",
]
[project.scripts]
camofox-mcp = "camofox_mcp.server:main"
[tool.setuptools]
packages = ["camofox_mcp"]
[tool.setuptools.package-dir]
camofox_mcp = "."

View file

@ -0,0 +1,674 @@
# SPDX-License-Identifier: EUPL-1.2
from __future__ import annotations
import argparse
import base64
import json
import logging
import os
import signal
import sys
import threading
from collections.abc import Mapping
from typing import Any, BinaryIO
import httpx
from pydantic import BaseModel, ConfigDict, Field, ValidationError
from . import __version__
try:
import asyncio
import mcp.server.stdio as mcp_stdio
import mcp.types as mcp_types
from mcp.server.lowlevel import NotificationOptions, Server as McpServer
from mcp.server.models import InitializationOptions
MCP_SDK_AVAILABLE = True
except ImportError: # pragma: no cover - exercised implicitly in local sandbox.
asyncio = None
mcp_stdio = None
mcp_types = None
NotificationOptions = None
McpServer = None
InitializationOptions = None
MCP_SDK_AVAILABLE = False
LOGGER = logging.getLogger("camofox_mcp")
SERVER_NAME = "camofox-mcp"
DEFAULT_CAMOFOX_URL = "http://localhost:8099/"
SUPPORTED_PROTOCOL_VERSIONS = ("2025-11-25", "2025-06-18", "2025-03-26")
READ_PAGE_EXPRESSION = (
"(() => {"
"const root = document.body ?? document.documentElement;"
"return {"
"title: document.title ?? '',"
"url: window.location.href ?? '',"
"text: root ? (root.innerText ?? root.textContent ?? '') : ''"
"};"
"})()"
)
class JsonRpcError(Exception):
"""JSON-RPC protocol error."""
def __init__(self, code: int, message: str, data: Any | None = None) -> None:
super().__init__(message)
self.code = code
self.message = message
self.data = data
class ToolExecutionError(Exception):
"""Tool execution failure that should be surfaced as an MCP tool error."""
class NavigateArgs(BaseModel):
model_config = ConfigDict(extra="forbid", str_strip_whitespace=True)
url: str = Field(min_length=1)
class TabArgs(BaseModel):
model_config = ConfigDict(extra="forbid")
tab_id: int = Field(ge=1)
class ClickArgs(TabArgs):
model_config = ConfigDict(extra="forbid", str_strip_whitespace=True)
selector: str = Field(min_length=1)
class FillArgs(ClickArgs):
value: str
class ToolDefinition:
"""Simple tool registry entry."""
def __init__(self, name: str, description: str, model: type[BaseModel], handler_name: str) -> None:
self.name = name
self.description = description
self.model = model
self.handler_name = handler_name
def schema(self) -> dict[str, Any]:
return {
"name": self.name,
"description": self.description,
"inputSchema": self.model.model_json_schema(),
}
class CamofoxClient:
"""Thin HTTP wrapper around the Camofox browser server."""
def __init__(
self,
base_url: str,
api_key: str | None = None,
*,
user_id: str | None = None,
session_key: str | None = None,
client: httpx.Client | None = None,
) -> None:
pid = os.getpid()
self.user_id = user_id or os.getenv("CAMOFOX_USER_ID") or f"claude-code-{pid}"
self.session_key = session_key or f"claude-code-{pid}"
self.api_key = api_key
self.base_url = base_url.rstrip("/") or base_url
headers = {"Accept": "application/json"}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
self._client = client or httpx.Client(base_url=self.base_url, headers=headers, timeout=30.0)
def request_json(
self,
method: str,
path: str,
*,
json_body: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
) -> dict[str, Any]:
try:
response = self._client.request(
method,
path,
json=json_body,
params=params,
headers=self._request_headers(),
)
response.raise_for_status()
except httpx.HTTPStatusError as exc:
detail = exc.response.text.strip() or str(exc)
raise ToolExecutionError(f"Camofox API returned HTTP {exc.response.status_code}: {detail}") from exc
except httpx.HTTPError as exc:
raise ToolExecutionError(f"Camofox API request failed: {exc}") from exc
if not response.content:
return {}
try:
return response.json()
except ValueError as exc:
raise ToolExecutionError(f"Camofox API returned invalid JSON for {method} {path}") from exc
def request_bytes(self, path: str, *, params: dict[str, Any] | None = None) -> bytes:
try:
response = self._client.request("GET", path, params=params, headers=self._request_headers())
response.raise_for_status()
except httpx.HTTPStatusError as exc:
detail = exc.response.text.strip() or str(exc)
raise ToolExecutionError(f"Camofox API returned HTTP {exc.response.status_code}: {detail}") from exc
except httpx.HTTPError as exc:
raise ToolExecutionError(f"Camofox API request failed: {exc}") from exc
return response.content
def close(self) -> None:
self._client.close()
def _request_headers(self) -> dict[str, str]:
headers = {"Accept": "application/json"}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
return headers
class CamofoxMcpApplication:
"""Business logic for the six exposed MCP tools."""
TOOLS = (
ToolDefinition("navigate", "Open a URL in a new Camofox tab.", NavigateArgs, "_navigate"),
ToolDefinition("read_page", "Read the current page text, URL, and title for a tab.", TabArgs, "_read_page"),
ToolDefinition("screenshot", "Capture a PNG screenshot for a tab and return it as base64.", TabArgs, "_screenshot"),
ToolDefinition("click", "Click a CSS selector inside a tab.", ClickArgs, "_click"),
ToolDefinition("fill", "Fill a CSS selector inside a tab with text.", FillArgs, "_fill"),
ToolDefinition("close_tab", "Close a tab.", TabArgs, "_close_tab"),
)
def __init__(self, client: CamofoxClient) -> None:
self.client = client
self._lock = threading.Lock()
self._next_tab_id = 1
self._remote_tabs: dict[int, str] = {}
self._tool_map = {tool.name: tool for tool in self.TOOLS}
def close(self) -> None:
self.client.close()
def tool_schemas(self) -> list[dict[str, Any]]:
return [tool.schema() for tool in self.TOOLS]
def dispatch_tool(self, name: str, arguments: Mapping[str, Any] | None) -> dict[str, Any]:
tool = self._tool_map.get(name)
if tool is None:
raise JsonRpcError(-32601, f"Unknown tool: {name}")
try:
validated = tool.model.model_validate(arguments or {})
except ValidationError as exc:
raise JsonRpcError(-32602, exc.json(include_url=False)) from exc
handler = getattr(self, tool.handler_name)
return handler(validated)
def _navigate(self, args: NavigateArgs) -> dict[str, Any]:
response = self.client.request_json(
"POST",
"/tabs",
json_body={
"userId": self.client.user_id,
"sessionKey": self.client.session_key,
"url": args.url,
},
)
remote_tab_id = self._extract_remote_tab_id(response)
local_tab_id = self._register_tab(remote_tab_id)
status = str(self._extract_first(response, ("status", "state", "message")) or "ok")
try:
wait_response = self.client.request_json(
"POST",
f"/tabs/{remote_tab_id}/wait",
json_body={"userId": self.client.user_id},
)
except ToolExecutionError as exc:
LOGGER.debug("navigate wait failed for tab %s: %s", remote_tab_id, exc)
else:
status = str(self._extract_first(wait_response, ("status", "state", "message")) or "ok")
return {"tab_id": local_tab_id, "status": status}
def _read_page(self, args: TabArgs) -> dict[str, Any]:
remote_tab_id = self._resolve_tab(args.tab_id)
try:
response = self.client.request_json(
"POST",
f"/tabs/{remote_tab_id}/evaluate",
json_body={
"userId": self.client.user_id,
"expression": READ_PAGE_EXPRESSION,
},
)
result = response.get("result", response)
if not isinstance(result, Mapping):
raise ToolExecutionError("Camofox evaluate response did not contain page metadata")
return {
"text": str(result.get("text", "")),
"url": str(result.get("url", "")),
"title": str(result.get("title", "")),
}
except ToolExecutionError as exc:
LOGGER.debug("read_page evaluate failed for tab %s: %s", remote_tab_id, exc)
snapshot = self.client.request_json(
"GET",
f"/tabs/{remote_tab_id}/snapshot",
params={"userId": self.client.user_id},
)
return {
"text": str(self._extract_first(snapshot, ("snapshot", "text")) or ""),
"url": str(self._extract_first(snapshot, ("url",)) or ""),
"title": str(self._extract_first(snapshot, ("title", "pageTitle")) or ""),
}
def _screenshot(self, args: TabArgs) -> dict[str, Any]:
remote_tab_id = self._resolve_tab(args.tab_id)
image = self.client.request_bytes(
f"/tabs/{remote_tab_id}/screenshot",
params={"userId": self.client.user_id, "fullPage": "true"},
)
return {"image_b64": base64.b64encode(image).decode("ascii")}
def _click(self, args: ClickArgs) -> dict[str, Any]:
remote_tab_id = self._resolve_tab(args.tab_id)
response = self.client.request_json(
"POST",
f"/tabs/{remote_tab_id}/click",
json_body={"userId": self.client.user_id, "selector": args.selector},
)
return {"ok": self._extract_ok(response)}
def _fill(self, args: FillArgs) -> dict[str, Any]:
remote_tab_id = self._resolve_tab(args.tab_id)
response = self.client.request_json(
"POST",
f"/tabs/{remote_tab_id}/type",
json_body={
"userId": self.client.user_id,
"selector": args.selector,
"text": args.value,
},
)
return {"ok": self._extract_ok(response)}
def _close_tab(self, args: TabArgs) -> dict[str, Any]:
remote_tab_id = self._resolve_tab(args.tab_id)
response = self.client.request_json(
"DELETE",
f"/tabs/{remote_tab_id}",
json_body={"userId": self.client.user_id},
)
with self._lock:
self._remote_tabs.pop(args.tab_id, None)
return {"ok": self._extract_ok(response)}
def _register_tab(self, remote_tab_id: str) -> int:
with self._lock:
local_tab_id = self._next_tab_id
self._next_tab_id += 1
self._remote_tabs[local_tab_id] = remote_tab_id
return local_tab_id
def _resolve_tab(self, local_tab_id: int) -> str:
with self._lock:
remote_tab_id = self._remote_tabs.get(local_tab_id)
if remote_tab_id is None:
raise ToolExecutionError(f"Unknown tab_id: {local_tab_id}")
return remote_tab_id
@staticmethod
def _extract_ok(response: Mapping[str, Any]) -> bool:
value = response.get("ok")
if isinstance(value, bool):
return value
success = response.get("success")
if isinstance(success, bool):
return success
return True
@classmethod
def _extract_remote_tab_id(cls, payload: Any) -> str:
value = cls._extract_first(payload, ("tabId", "tab_id", "targetId", "target_id", "id"))
if value in (None, ""):
raise ToolExecutionError("Camofox did not return a tab identifier")
return str(value)
@classmethod
def _extract_first(cls, payload: Any, keys: tuple[str, ...]) -> Any | None:
if isinstance(payload, Mapping):
for key in keys:
if key in payload and payload[key] not in (None, ""):
return payload[key]
for value in payload.values():
found = cls._extract_first(value, keys)
if found not in (None, ""):
return found
return None
if isinstance(payload, list):
for item in payload:
found = cls._extract_first(item, keys)
if found not in (None, ""):
return found
return None
class MinimalStdioMcpServer:
"""Small MCP stdio server used when the official SDK is unavailable."""
def __init__(self, app: CamofoxMcpApplication) -> None:
self.app = app
self.protocol_version = SUPPORTED_PROTOCOL_VERSIONS[0]
self.initialised = False
self._output_framing = "line"
def serve(self, reader: BinaryIO, writer: BinaryIO) -> None:
while True:
payload, framing = read_stdio_message(reader)
if payload is None:
break
self._output_framing = framing or self._output_framing
try:
message = json.loads(payload)
except json.JSONDecodeError as exc:
response = jsonrpc_error(None, -32700, f"Parse error: {exc.msg}")
write_stdio_message(writer, response, framing=self._output_framing)
continue
response = self.handle_message(message)
if response is None:
continue
write_stdio_message(writer, response, framing=self._output_framing)
def handle_message(self, message: Any) -> dict[str, Any] | list[dict[str, Any]] | None:
if isinstance(message, list):
if not message:
return jsonrpc_error(None, -32600, "Invalid Request")
responses: list[dict[str, Any]] = []
for item in message:
response = self._handle_single(item)
if response is not None:
responses.append(response)
return responses or None
return self._handle_single(message)
def _handle_single(self, message: Any) -> dict[str, Any] | None:
if not isinstance(message, Mapping):
return jsonrpc_error(None, -32600, "Invalid Request")
request_id = message.get("id")
method = message.get("method")
if not isinstance(method, str):
return jsonrpc_error(request_id, -32600, "Invalid Request")
try:
result = self._dispatch(method, message.get("params"))
except JsonRpcError as exc:
return jsonrpc_error(request_id, exc.code, exc.message, exc.data)
except ToolExecutionError as exc:
if request_id is None:
return None
return jsonrpc_success(request_id, self._tool_error_result(str(exc)))
except Exception as exc: # pragma: no cover - defensive guard.
LOGGER.exception("Unexpected MCP server failure")
return jsonrpc_error(request_id, -32603, f"Internal error: {exc}")
if request_id is None:
return None
return jsonrpc_success(request_id, result)
def _dispatch(self, method: str, params: Any) -> dict[str, Any]:
if method == "initialize":
if not isinstance(params, Mapping):
raise JsonRpcError(-32602, "initialize requires params")
requested = params.get("protocolVersion")
self.protocol_version = negotiate_protocol_version(requested)
return {
"protocolVersion": self.protocol_version,
"capabilities": {"tools": {"listChanged": False}},
"serverInfo": {"name": SERVER_NAME, "version": __version__},
"instructions": "Expose the Camofox browser HTTP API as MCP tools for Claude Code.",
}
if method == "notifications/initialized":
self.initialised = True
return {}
if method == "ping":
return {}
if not self.initialised:
raise JsonRpcError(-32002, "Server has not completed initialization")
if method == "tools/list":
return {"tools": self.app.tool_schemas()}
if method == "tools/call":
if not isinstance(params, Mapping):
raise JsonRpcError(-32602, "tools/call requires params")
name = params.get("name")
arguments = params.get("arguments")
if not isinstance(name, str):
raise JsonRpcError(-32602, "tools/call requires a tool name")
result = self.app.dispatch_tool(name, arguments if isinstance(arguments, Mapping) else {})
return self._tool_success_result(result)
if method == "resources/list":
return {"resources": []}
if method == "resources/templates/list":
return {"resourceTemplates": []}
if method == "prompts/list":
return {"prompts": []}
if method == "logging/setLevel":
return {}
raise JsonRpcError(-32601, f"Method not found: {method}")
def _tool_success_result(self, result: dict[str, Any]) -> dict[str, Any]:
payload = {
"content": [{"type": "text", "text": json.dumps(result, ensure_ascii=False, separators=(",", ":"))}],
"isError": False,
}
if self.protocol_version >= "2025-06-18":
payload["structuredContent"] = result
return payload
@staticmethod
def _tool_error_result(message: str) -> dict[str, Any]:
return {"content": [{"type": "text", "text": message}], "isError": True}
def build_argument_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Camofox MCP stdio server")
parser.add_argument("--camofox-url", default=DEFAULT_CAMOFOX_URL, help="Base URL for the Camofox HTTP API")
parser.add_argument(
"--api-key",
default=os.getenv("CAMOFOX_API_KEY"),
help="Bearer token for the Camofox API (defaults to CAMOFOX_API_KEY)",
)
return parser
def configure_logging() -> None:
logging.basicConfig(
level=os.getenv("CAMOFOX_LOG_LEVEL", "INFO").upper(),
format="%(asctime)s %(levelname)s %(name)s %(message)s",
stream=sys.stderr,
)
def install_signal_handlers() -> None:
def _handle_signal(signum: int, _frame: Any) -> None:
LOGGER.info("Received signal %s, shutting down", signum)
raise KeyboardInterrupt
signal.signal(signal.SIGINT, _handle_signal)
signal.signal(signal.SIGTERM, _handle_signal)
def negotiate_protocol_version(requested: Any) -> str:
if isinstance(requested, str) and requested in SUPPORTED_PROTOCOL_VERSIONS:
return requested
return SUPPORTED_PROTOCOL_VERSIONS[0]
def jsonrpc_success(request_id: Any, result: dict[str, Any]) -> dict[str, Any]:
return {"jsonrpc": "2.0", "id": request_id, "result": result}
def jsonrpc_error(request_id: Any, code: int, message: str, data: Any | None = None) -> dict[str, Any]:
error = {"code": code, "message": message}
if data is not None:
error["data"] = data
return {"jsonrpc": "2.0", "id": request_id, "error": error}
def read_stdio_message(reader: BinaryIO) -> tuple[str | None, str | None]:
while True:
line = reader.readline()
if line == b"":
return None, None
if line in (b"\n", b"\r\n"):
continue
if line.lower().startswith(b"content-length:"):
try:
content_length = int(line.split(b":", 1)[1].strip())
except (IndexError, ValueError) as exc:
raise JsonRpcError(-32700, f"Invalid Content-Length header: {line!r}") from exc
while True:
header = reader.readline()
if header == b"":
return None, "content-length"
if header in (b"\n", b"\r\n"):
break
payload = reader.read(content_length)
if len(payload) != content_length:
return None, "content-length"
return payload.decode("utf-8"), "content-length"
return line.strip().decode("utf-8"), "line"
def write_stdio_message(writer: BinaryIO, message: Any, *, framing: str = "line") -> None:
payload = json.dumps(message, ensure_ascii=False, separators=(",", ":")).encode("utf-8")
if framing == "content-length":
writer.write(f"Content-Length: {len(payload)}\r\n\r\n".encode("ascii"))
writer.write(payload)
else:
writer.write(payload + b"\n")
flush = getattr(writer, "flush", None)
if callable(flush):
flush()
async def run_sdk_server(app: CamofoxMcpApplication) -> None:
if not MCP_SDK_AVAILABLE or asyncio is None or mcp_stdio is None or mcp_types is None:
raise RuntimeError("MCP SDK is not available")
server = McpServer(SERVER_NAME)
@server.list_tools()
async def _list_tools() -> list[Any]:
return [
mcp_types.Tool(
name=tool["name"],
description=tool["description"],
inputSchema=tool["inputSchema"],
)
for tool in app.tool_schemas()
]
@server.call_tool()
async def _call_tool(name: str, arguments: dict[str, Any] | None) -> Any:
try:
result = app.dispatch_tool(name, arguments or {})
return mcp_types.CallToolResult(
content=[
mcp_types.TextContent(
type="text",
text=json.dumps(result, ensure_ascii=False, separators=(",", ":")),
)
],
structuredContent=result,
isError=False,
)
except (JsonRpcError, ToolExecutionError) as exc:
return mcp_types.CallToolResult(
content=[mcp_types.TextContent(type="text", text=str(exc))],
isError=True,
)
async with mcp_stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name=SERVER_NAME,
server_version=__version__,
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
def run_fallback_server(app: CamofoxMcpApplication) -> None:
server = MinimalStdioMcpServer(app)
server.serve(sys.stdin.buffer, sys.stdout.buffer)
def main(argv: list[str] | None = None) -> int:
configure_logging()
install_signal_handlers()
args = build_argument_parser().parse_args(argv)
app = CamofoxMcpApplication(CamofoxClient(args.camofox_url, args.api_key))
try:
LOGGER.info(
"Starting %s against %s using %s transport",
SERVER_NAME,
args.camofox_url,
"official MCP SDK" if MCP_SDK_AVAILABLE else "fallback JSON-RPC",
)
if MCP_SDK_AVAILABLE:
asyncio.run(run_sdk_server(app))
else:
run_fallback_server(app)
except KeyboardInterrupt:
LOGGER.info("Shutdown requested")
finally:
app.close()
return 0
if __name__ == "__main__": # pragma: no cover
raise SystemExit(main())

View file

@ -0,0 +1 @@
# SPDX-License-Identifier: EUPL-1.2

View file

@ -0,0 +1,250 @@
# SPDX-License-Identifier: EUPL-1.2
from __future__ import annotations
import io
import json
import unittest
import httpx
from camofox_mcp.server import (
CamofoxClient,
CamofoxMcpApplication,
MinimalStdioMcpServer,
read_stdio_message,
)
def make_response(request: httpx.Request, payload: object, status_code: int = 200) -> httpx.Response:
return httpx.Response(status_code, json=payload, request=request)
def make_bytes_response(request: httpx.Request, payload: bytes, status_code: int = 200) -> httpx.Response:
return httpx.Response(status_code, content=payload, request=request)
class CamofoxMcpApplicationTests(unittest.TestCase):
def make_app(self, handler) -> CamofoxMcpApplication:
transport = httpx.MockTransport(handler)
client = httpx.Client(transport=transport, base_url="http://camofox.local")
camofox = CamofoxClient(
"http://camofox.local",
"secret-token",
user_id="agent1",
session_key="session1",
client=client,
)
self.addCleanup(camofox.close)
return CamofoxMcpApplication(camofox)
def test_navigate_returns_local_tab_handle_and_status(self) -> None:
calls: list[tuple[str, str, dict[str, object], str | None]] = []
def handler(request: httpx.Request) -> httpx.Response:
body = json.loads(request.content.decode("utf-8")) if request.content else {}
calls.append(
(
request.method,
request.url.path,
body,
request.headers.get("Authorization"),
)
)
if request.url.path == "/tabs":
return make_response(request, {"tabId": "remote-abc", "status": "created"})
if request.url.path == "/tabs/remote-abc/wait":
return make_response(request, {"ok": True})
raise AssertionError(f"unexpected request: {request.method} {request.url}")
app = self.make_app(handler)
result = app.dispatch_tool("navigate", {"url": "https://example.com"})
self.assertEqual(result, {"tab_id": 1, "status": "ok"})
self.assertEqual(
calls,
[
(
"POST",
"/tabs",
{"userId": "agent1", "sessionKey": "session1", "url": "https://example.com"},
"Bearer secret-token",
),
("POST", "/tabs/remote-abc/wait", {"userId": "agent1"}, "Bearer secret-token"),
],
)
def test_read_page_uses_evaluate_endpoint(self) -> None:
def handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/tabs":
return make_response(request, {"tabId": "remote-abc"})
if request.url.path == "/tabs/remote-abc/wait":
return make_response(request, {"ok": True})
if request.url.path == "/tabs/remote-abc/evaluate":
payload = json.loads(request.content.decode("utf-8"))
self.assertEqual(payload["userId"], "agent1")
self.assertIn("document.title", payload["expression"])
return make_response(
request,
{
"ok": True,
"result": {
"text": "Hello world",
"url": "https://example.com",
"title": "Example Domain",
},
},
)
raise AssertionError(f"unexpected request: {request.method} {request.url}")
app = self.make_app(handler)
navigate = app.dispatch_tool("navigate", {"url": "https://example.com"})
result = app.dispatch_tool("read_page", {"tab_id": navigate["tab_id"]})
self.assertEqual(
result,
{
"text": "Hello world",
"url": "https://example.com",
"title": "Example Domain",
},
)
def test_screenshot_base64_encodes_png(self) -> None:
image = b"\x89PNG\r\n\x1a\nmock"
def handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/tabs":
return make_response(request, {"tabId": "remote-abc"})
if request.url.path == "/tabs/remote-abc/wait":
return make_response(request, {"ok": True})
if request.url.path == "/tabs/remote-abc/screenshot":
self.assertEqual(request.url.params["userId"], "agent1")
self.assertEqual(request.url.params["fullPage"], "true")
return make_bytes_response(request, image)
raise AssertionError(f"unexpected request: {request.method} {request.url}")
app = self.make_app(handler)
navigate = app.dispatch_tool("navigate", {"url": "https://example.com"})
result = app.dispatch_tool("screenshot", {"tab_id": navigate["tab_id"]})
self.assertEqual(result, {"image_b64": "iVBORw0KGgptb2Nr"})
def test_click_posts_selector(self) -> None:
def handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/tabs":
return make_response(request, {"tabId": "remote-abc"})
if request.url.path == "/tabs/remote-abc/wait":
return make_response(request, {"ok": True})
if request.url.path == "/tabs/remote-abc/click":
self.assertEqual(
json.loads(request.content.decode("utf-8")),
{"userId": "agent1", "selector": "#submit"},
)
return make_response(request, {"ok": True})
raise AssertionError(f"unexpected request: {request.method} {request.url}")
app = self.make_app(handler)
navigate = app.dispatch_tool("navigate", {"url": "https://example.com"})
result = app.dispatch_tool("click", {"tab_id": navigate["tab_id"], "selector": "#submit"})
self.assertEqual(result, {"ok": True})
def test_fill_posts_type_request(self) -> None:
def handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/tabs":
return make_response(request, {"tabId": "remote-abc"})
if request.url.path == "/tabs/remote-abc/wait":
return make_response(request, {"ok": True})
if request.url.path == "/tabs/remote-abc/type":
self.assertEqual(
json.loads(request.content.decode("utf-8")),
{"userId": "agent1", "selector": "#username", "text": "snider"},
)
return make_response(request, {"ok": True})
raise AssertionError(f"unexpected request: {request.method} {request.url}")
app = self.make_app(handler)
navigate = app.dispatch_tool("navigate", {"url": "https://example.com"})
result = app.dispatch_tool(
"fill",
{"tab_id": navigate["tab_id"], "selector": "#username", "value": "snider"},
)
self.assertEqual(result, {"ok": True})
def test_close_tab_posts_delete_and_unregisters_handle(self) -> None:
def handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/tabs":
return make_response(request, {"tabId": "remote-abc"})
if request.url.path == "/tabs/remote-abc/wait":
return make_response(request, {"ok": True})
if request.method == "DELETE" and request.url.path == "/tabs/remote-abc":
self.assertEqual(json.loads(request.content.decode("utf-8")), {"userId": "agent1"})
return make_response(request, {"ok": True})
raise AssertionError(f"unexpected request: {request.method} {request.url}")
app = self.make_app(handler)
navigate = app.dispatch_tool("navigate", {"url": "https://example.com"})
result = app.dispatch_tool("close_tab", {"tab_id": navigate["tab_id"]})
self.assertEqual(result, {"ok": True})
with self.assertRaisesRegex(Exception, "Unknown tab_id"):
app.dispatch_tool("read_page", {"tab_id": navigate["tab_id"]})
class MinimalStdioMcpServerTests(unittest.TestCase):
def test_stdio_server_dispatches_initialize_tools_list_and_call(self) -> None:
def handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/tabs":
return make_response(request, {"tabId": "remote-abc", "status": "created"})
if request.url.path == "/tabs/remote-abc/wait":
return make_response(request, {"ok": True})
raise AssertionError(f"unexpected request: {request.method} {request.url}")
transport = httpx.MockTransport(handler)
client = httpx.Client(transport=transport, base_url="http://camofox.local")
camofox = CamofoxClient(
"http://camofox.local",
user_id="agent1",
session_key="session1",
client=client,
)
app = CamofoxMcpApplication(camofox)
self.addCleanup(app.close)
server = MinimalStdioMcpServer(app)
stdin = io.BytesIO(
b'{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-03-26","capabilities":{},"clientInfo":{"name":"claude","version":"1.0"}}}\n'
b'{"jsonrpc":"2.0","method":"notifications/initialized"}\n'
b'{"jsonrpc":"2.0","id":2,"method":"tools/list"}\n'
b'{"jsonrpc":"2.0","id":3,"method":"tools/call","params":{"name":"navigate","arguments":{"url":"https://example.com"}}}\n'
)
stdout = io.BytesIO()
server.serve(stdin, stdout)
responses = [json.loads(line) for line in stdout.getvalue().decode("utf-8").splitlines() if line.strip()]
self.assertEqual(responses[0]["result"]["protocolVersion"], "2025-03-26")
self.assertEqual(responses[1]["result"]["tools"][0]["name"], "navigate")
self.assertEqual(responses[2]["result"]["content"][0]["type"], "text")
self.assertIn('"tab_id":1', responses[2]["result"]["content"][0]["text"])
def test_content_length_framing_is_accepted(self) -> None:
payload = json.dumps({"jsonrpc": "2.0", "id": 1, "method": "ping"}).encode("utf-8")
framed = io.BytesIO(f"Content-Length: {len(payload)}\r\n\r\n".encode("ascii") + payload)
message, framing = read_stdio_message(framed)
self.assertEqual(framing, "content-length")
self.assertEqual(json.loads(message), {"jsonrpc": "2.0", "id": 1, "method": "ping"})
if __name__ == "__main__":
unittest.main()