From 4fd2774614182ebaf74f2e7a8c04bbcf0b09ed96 Mon Sep 17 00:00:00 2001 From: Shaqayeq Date: Wed, 18 Mar 2026 17:57:48 -0700 Subject: [PATCH] Add Python SDK thread.run convenience methods (#15088) ## TL;DR Add `thread.run(...)` / `async thread.run(...)` convenience methods to the Python SDK for the common case. - add `RunInput = Input | str` and `RunResult` with `final_response`, collected `items`, and optional `usage` - keep `thread.turn(...)` strict and lower-level for streaming, steering, interrupting, and raw generated `Turn` access - update Python SDK docs, quickstart examples, and tests for the sync and async convenience flows ## Validation - `python3 -m pytest sdk/python/tests/test_public_api_signatures.py sdk/python/tests/test_public_api_runtime_behavior.py` - `python3 -m pytest sdk/python/tests/test_real_app_server_integration.py -k 'thread_run_convenience or async_thread_run_convenience'` (skipped in this environment) --------- Co-authored-by: Codex --- sdk/python/README.md | 13 +- sdk/python/docs/api-reference.md | 27 +- sdk/python/docs/getting-started.md | 38 +- .../01_quickstart_constructor/async.py | 14 +- .../01_quickstart_constructor/sync.py | 12 +- sdk/python/src/codex_app_server/__init__.py | 2 + sdk/python/src/codex_app_server/_inputs.py | 63 ++++ sdk/python/src/codex_app_server/_run.py | 112 ++++++ sdk/python/src/codex_app_server/api.py | 138 ++++--- .../tests/test_public_api_runtime_behavior.py | 348 +++++++++++++++++- .../tests/test_public_api_signatures.py | 30 +- .../tests/test_real_app_server_integration.py | 66 ++++ 12 files changed, 760 insertions(+), 103 deletions(-) create mode 100644 sdk/python/src/codex_app_server/_inputs.py create mode 100644 sdk/python/src/codex_app_server/_run.py diff --git a/sdk/python/README.md b/sdk/python/README.md index 993e4bcec..97068afe3 100644 --- a/sdk/python/README.md +++ b/sdk/python/README.md @@ -19,15 +19,18 @@ installs the pinned runtime package automatically. ## Quickstart ```python -from codex_app_server import Codex, TextInput +from codex_app_server import Codex with Codex() as codex: thread = codex.thread_start(model="gpt-5") - completed_turn = thread.turn(TextInput("Say hello in one sentence.")).run() - print(completed_turn.status) - print(completed_turn.id) + result = thread.run("Say hello in one sentence.") + print(result.final_response) + print(len(result.items)) ``` +`result.final_response` is `None` when the turn completes without a final-answer +or phase-less assistant message item. + ## Docs map - Golden path tutorial: `docs/getting-started.md` @@ -95,4 +98,6 @@ This supports the CI release flow: - `Codex()` is eager and performs startup + `initialize` in the constructor. - Use context managers (`with Codex() as codex:`) to ensure shutdown. +- Prefer `thread.run("...")` for the common case. Use `thread.turn(...)` when + you need streaming, steering, or interrupt control. - For transient overload, use `codex_app_server.retry.retry_on_overload`. diff --git a/sdk/python/docs/api-reference.md b/sdk/python/docs/api-reference.md index 29396b773..ddeaf39cd 100644 --- a/sdk/python/docs/api-reference.md +++ b/sdk/python/docs/api-reference.md @@ -2,7 +2,7 @@ Public surface of `codex_app_server` for app-server v2. -This SDK surface is experimental. The current implementation intentionally allows only one active `TurnHandle.stream()` or `TurnHandle.run()` consumer per client instance at a time. +This SDK surface is experimental. The current implementation intentionally allows only one active turn consumer (`Thread.run()`, `TurnHandle.stream()`, or `TurnHandle.run()`) per client instance at a time. ## Package Entry @@ -10,6 +10,7 @@ This SDK surface is experimental. The current implementation intentionally allow from codex_app_server import ( Codex, AsyncCodex, + RunResult, Thread, AsyncThread, TurnHandle, @@ -24,7 +25,7 @@ from codex_app_server import ( MentionInput, TurnStatus, ) -from codex_app_server.generated.v2_all import ThreadItem +from codex_app_server.generated.v2_all import ThreadItem, ThreadTokenUsage ``` - Version: `codex_app_server.__version__` @@ -97,6 +98,7 @@ async with AsyncCodex() as codex: ### Thread +- `run(input: str | Input, *, approval_policy=None, approvals_reviewer=None, cwd=None, effort=None, model=None, output_schema=None, personality=None, sandbox_policy=None, service_tier=None, summary=None) -> RunResult` - `turn(input: Input, *, approval_policy=None, cwd=None, effort=None, model=None, output_schema=None, personality=None, sandbox_policy=None, summary=None) -> TurnHandle` - `read(*, include_turns: bool = False) -> ThreadReadResponse` - `set_name(name: str) -> ThreadSetNameResponse` @@ -104,11 +106,26 @@ async with AsyncCodex() as codex: ### AsyncThread +- `run(input: str | Input, *, approval_policy=None, approvals_reviewer=None, cwd=None, effort=None, model=None, output_schema=None, personality=None, sandbox_policy=None, service_tier=None, summary=None) -> Awaitable[RunResult]` - `turn(input: Input, *, approval_policy=None, cwd=None, effort=None, model=None, output_schema=None, personality=None, sandbox_policy=None, summary=None) -> Awaitable[AsyncTurnHandle]` - `read(*, include_turns: bool = False) -> Awaitable[ThreadReadResponse]` - `set_name(name: str) -> Awaitable[ThreadSetNameResponse]` - `compact() -> Awaitable[ThreadCompactStartResponse]` +`run(...)` is the common-case convenience path. It accepts plain strings, starts +the turn, consumes notifications until completion, and returns a small result +object with: + +- `final_response: str | None` +- `items: list[ThreadItem]` +- `usage: ThreadTokenUsage | None` + +`final_response` is `None` when the turn finishes without a final-answer or +phase-less assistant message item. + +Use `turn(...)` when you need low-level turn control (`stream()`, `steer()`, +`interrupt()`) or the canonical generated `Turn` from `TurnHandle.run()`. + ## TurnHandle / AsyncTurnHandle ### TurnHandle @@ -181,10 +198,10 @@ from codex_app_server import ( ## Example ```python -from codex_app_server import Codex, TextInput +from codex_app_server import Codex with Codex() as codex: thread = codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"}) - completed_turn = thread.turn(TextInput("Say hello in one sentence.")).run() - print(completed_turn.id, completed_turn.status) + result = thread.run("Say hello in one sentence.") + print(result.final_response) ``` diff --git a/sdk/python/docs/getting-started.md b/sdk/python/docs/getting-started.md index aaa6298d4..76034d72e 100644 --- a/sdk/python/docs/getting-started.md +++ b/sdk/python/docs/getting-started.md @@ -22,41 +22,42 @@ Requirements: ## 2) Run your first turn (sync) ```python -from codex_app_server import Codex, TextInput +from codex_app_server import Codex with Codex() as codex: server = codex.metadata.serverInfo print("Server:", None if server is None else server.name, None if server is None else server.version) thread = codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"}) - completed_turn = thread.turn(TextInput("Say hello in one sentence.")).run() + result = thread.run("Say hello in one sentence.") print("Thread:", thread.id) - print("Turn:", completed_turn.id) - print("Status:", completed_turn.status) - print("Items:", len(completed_turn.items or [])) + print("Text:", result.final_response) + print("Items:", len(result.items)) ``` What happened: - `Codex()` started and initialized `codex app-server`. - `thread_start(...)` created a thread. -- `turn(...).run()` consumed events until `turn/completed` and returned the canonical generated app-server `Turn` model. -- one client can have only one active `TurnHandle.stream()` / `TurnHandle.run()` consumer at a time in the current experimental build +- `thread.run("...")` started a turn, consumed events until completion, and returned the final assistant response plus collected items and usage. +- `result.final_response` is `None` when no final-answer or phase-less assistant message item completes for the turn. +- use `thread.turn(...)` when you need a `TurnHandle` for streaming, steering, interrupting, or turn IDs/status +- one client can have only one active turn consumer (`thread.run(...)`, `TurnHandle.stream()`, or `TurnHandle.run()`) at a time in the current experimental build ## 3) Continue the same thread (multi-turn) ```python -from codex_app_server import Codex, TextInput +from codex_app_server import Codex with Codex() as codex: thread = codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"}) - first = thread.turn(TextInput("Summarize Rust ownership in 2 bullets.")).run() - second = thread.turn(TextInput("Now explain it to a Python developer.")).run() + first = thread.run("Summarize Rust ownership in 2 bullets.") + second = thread.run("Now explain it to a Python developer.") - print("first:", first.id, first.status) - print("second:", second.id, second.status) + print("first:", first.final_response) + print("second:", second.final_response) ``` ## 4) Async parity @@ -66,15 +67,14 @@ initializes lazily, and context entry makes startup/shutdown explicit. ```python import asyncio -from codex_app_server import AsyncCodex, TextInput +from codex_app_server import AsyncCodex async def main() -> None: async with AsyncCodex() as codex: thread = await codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"}) - turn = await thread.turn(TextInput("Continue where we left off.")) - completed_turn = await turn.run() - print(completed_turn.id, completed_turn.status) + result = await thread.run("Continue where we left off.") + print(result.final_response) asyncio.run(main()) @@ -83,14 +83,14 @@ asyncio.run(main()) ## 5) Resume an existing thread ```python -from codex_app_server import Codex, TextInput +from codex_app_server import Codex THREAD_ID = "thr_123" # replace with a real id with Codex() as codex: thread = codex.thread_resume(THREAD_ID) - completed_turn = thread.turn(TextInput("Continue where we left off.")).run() - print(completed_turn.id, completed_turn.status) + result = thread.run("Continue where we left off.") + print(result.final_response) ``` ## 6) Generated models diff --git a/sdk/python/examples/01_quickstart_constructor/async.py b/sdk/python/examples/01_quickstart_constructor/async.py index cf525fa63..b9eedb76b 100644 --- a/sdk/python/examples/01_quickstart_constructor/async.py +++ b/sdk/python/examples/01_quickstart_constructor/async.py @@ -6,9 +6,7 @@ if str(_EXAMPLES_ROOT) not in sys.path: sys.path.insert(0, str(_EXAMPLES_ROOT)) from _bootstrap import ( - assistant_text_from_turn, ensure_local_sdk_src, - find_turn_by_id, runtime_config, server_label, ) @@ -17,7 +15,7 @@ ensure_local_sdk_src() import asyncio -from codex_app_server import AsyncCodex, TextInput +from codex_app_server import AsyncCodex async def main() -> None: @@ -25,13 +23,9 @@ async def main() -> None: print("Server:", server_label(codex.metadata)) thread = await codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"}) - turn = await thread.turn(TextInput("Say hello in one sentence.")) - result = await turn.run() - persisted = await thread.read(include_turns=True) - persisted_turn = find_turn_by_id(persisted.thread.turns, result.id) - - print("Status:", result.status) - print("Text:", assistant_text_from_turn(persisted_turn)) + result = await thread.run("Say hello in one sentence.") + print("Items:", len(result.items)) + print("Text:", result.final_response) if __name__ == "__main__": diff --git a/sdk/python/examples/01_quickstart_constructor/sync.py b/sdk/python/examples/01_quickstart_constructor/sync.py index 6abf29af3..6970d5a26 100644 --- a/sdk/python/examples/01_quickstart_constructor/sync.py +++ b/sdk/python/examples/01_quickstart_constructor/sync.py @@ -6,23 +6,19 @@ if str(_EXAMPLES_ROOT) not in sys.path: sys.path.insert(0, str(_EXAMPLES_ROOT)) from _bootstrap import ( - assistant_text_from_turn, ensure_local_sdk_src, - find_turn_by_id, runtime_config, server_label, ) ensure_local_sdk_src() -from codex_app_server import Codex, TextInput +from codex_app_server import Codex with Codex(config=runtime_config()) as codex: print("Server:", server_label(codex.metadata)) thread = codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"}) - result = thread.turn(TextInput("Say hello in one sentence.")).run() - persisted = thread.read(include_turns=True) - persisted_turn = find_turn_by_id(persisted.thread.turns, result.id) - print("Status:", result.status) - print("Text:", assistant_text_from_turn(persisted_turn)) + result = thread.run("Say hello in one sentence.") + print("Items:", len(result.items)) + print("Text:", result.final_response) diff --git a/sdk/python/src/codex_app_server/__init__.py b/sdk/python/src/codex_app_server/__init__.py index 91f334df8..c35ce0ebe 100644 --- a/sdk/python/src/codex_app_server/__init__.py +++ b/sdk/python/src/codex_app_server/__init__.py @@ -47,6 +47,7 @@ from .api import ( InputItem, LocalImageInput, MentionInput, + RunResult, SkillInput, TextInput, Thread, @@ -68,6 +69,7 @@ __all__ = [ "TurnHandle", "AsyncTurnHandle", "InitializeResponse", + "RunResult", "Input", "InputItem", "TextInput", diff --git a/sdk/python/src/codex_app_server/_inputs.py b/sdk/python/src/codex_app_server/_inputs.py new file mode 100644 index 000000000..e3cd1c396 --- /dev/null +++ b/sdk/python/src/codex_app_server/_inputs.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from .models import JsonObject + + +@dataclass(slots=True) +class TextInput: + text: str + + +@dataclass(slots=True) +class ImageInput: + url: str + + +@dataclass(slots=True) +class LocalImageInput: + path: str + + +@dataclass(slots=True) +class SkillInput: + name: str + path: str + + +@dataclass(slots=True) +class MentionInput: + name: str + path: str + + +InputItem = TextInput | ImageInput | LocalImageInput | SkillInput | MentionInput +Input = list[InputItem] | InputItem +RunInput = Input | str + + +def _to_wire_item(item: InputItem) -> JsonObject: + if isinstance(item, TextInput): + return {"type": "text", "text": item.text} + if isinstance(item, ImageInput): + return {"type": "image", "url": item.url} + if isinstance(item, LocalImageInput): + return {"type": "localImage", "path": item.path} + if isinstance(item, SkillInput): + return {"type": "skill", "name": item.name, "path": item.path} + if isinstance(item, MentionInput): + return {"type": "mention", "name": item.name, "path": item.path} + raise TypeError(f"unsupported input item: {type(item)!r}") + + +def _to_wire_input(input: Input) -> list[JsonObject]: + if isinstance(input, list): + return [_to_wire_item(i) for i in input] + return [_to_wire_item(input)] + + +def _normalize_run_input(input: RunInput) -> Input: + if isinstance(input, str): + return TextInput(input) + return input diff --git a/sdk/python/src/codex_app_server/_run.py b/sdk/python/src/codex_app_server/_run.py new file mode 100644 index 000000000..73ec36246 --- /dev/null +++ b/sdk/python/src/codex_app_server/_run.py @@ -0,0 +1,112 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import AsyncIterator, Iterator + +from .generated.v2_all import ( + AgentMessageThreadItem, + ItemCompletedNotification, + MessagePhase, + ThreadItem, + ThreadTokenUsage, + ThreadTokenUsageUpdatedNotification, + Turn as AppServerTurn, + TurnCompletedNotification, + TurnStatus, +) +from .models import Notification + + +@dataclass(slots=True) +class RunResult: + final_response: str | None + items: list[ThreadItem] + usage: ThreadTokenUsage | None + + +def _agent_message_item_from_thread_item( + item: ThreadItem, +) -> AgentMessageThreadItem | None: + thread_item = item.root if hasattr(item, "root") else item + if isinstance(thread_item, AgentMessageThreadItem): + return thread_item + return None + + +def _final_assistant_response_from_items(items: list[ThreadItem]) -> str | None: + last_unknown_phase_response: str | None = None + + for item in reversed(items): + agent_message = _agent_message_item_from_thread_item(item) + if agent_message is None: + continue + if agent_message.phase == MessagePhase.final_answer: + return agent_message.text + if agent_message.phase is None and last_unknown_phase_response is None: + last_unknown_phase_response = agent_message.text + + return last_unknown_phase_response + + +def _raise_for_failed_turn(turn: AppServerTurn) -> None: + if turn.status != TurnStatus.failed: + return + if turn.error is not None and turn.error.message: + raise RuntimeError(turn.error.message) + raise RuntimeError(f"turn failed with status {turn.status.value}") + + +def _collect_run_result(stream: Iterator[Notification], *, turn_id: str) -> RunResult: + completed: TurnCompletedNotification | None = None + items: list[ThreadItem] = [] + usage: ThreadTokenUsage | None = None + + for event in stream: + payload = event.payload + if isinstance(payload, ItemCompletedNotification) and payload.turn_id == turn_id: + items.append(payload.item) + continue + if isinstance(payload, ThreadTokenUsageUpdatedNotification) and payload.turn_id == turn_id: + usage = payload.token_usage + continue + if isinstance(payload, TurnCompletedNotification) and payload.turn.id == turn_id: + completed = payload + + if completed is None: + raise RuntimeError("turn completed event not received") + + _raise_for_failed_turn(completed.turn) + return RunResult( + final_response=_final_assistant_response_from_items(items), + items=items, + usage=usage, + ) + + +async def _collect_async_run_result( + stream: AsyncIterator[Notification], *, turn_id: str +) -> RunResult: + completed: TurnCompletedNotification | None = None + items: list[ThreadItem] = [] + usage: ThreadTokenUsage | None = None + + async for event in stream: + payload = event.payload + if isinstance(payload, ItemCompletedNotification) and payload.turn_id == turn_id: + items.append(payload.item) + continue + if isinstance(payload, ThreadTokenUsageUpdatedNotification) and payload.turn_id == turn_id: + usage = payload.token_usage + continue + if isinstance(payload, TurnCompletedNotification) and payload.turn.id == turn_id: + completed = payload + + if completed is None: + raise RuntimeError("turn completed event not received") + + _raise_for_failed_turn(completed.turn) + return RunResult( + final_response=_final_assistant_response_from_items(items), + items=items, + usage=usage, + ) diff --git a/sdk/python/src/codex_app_server/api.py b/sdk/python/src/codex_app_server/api.py index b465c574d..5009d9bbf 100644 --- a/sdk/python/src/codex_app_server/api.py +++ b/sdk/python/src/codex_app_server/api.py @@ -7,6 +7,7 @@ from typing import AsyncIterator, Iterator from .async_client import AsyncAppServerClient from .client import AppServerClient, AppServerConfig from .generated.v2_all import ( + ApprovalsReviewer, AskForApproval, ModelListResponse, Personality, @@ -18,7 +19,6 @@ from .generated.v2_all import ( ThreadArchiveResponse, ThreadCompactStartResponse, ThreadForkParams, - ThreadItem, ThreadListParams, ThreadListResponse, ThreadReadResponse, @@ -34,57 +34,23 @@ from .generated.v2_all import ( TurnSteerResponse, ) from .models import InitializeResponse, JsonObject, Notification, ServerInfo - - -@dataclass(slots=True) -class TextInput: - text: str - - -@dataclass(slots=True) -class ImageInput: - url: str - - -@dataclass(slots=True) -class LocalImageInput: - path: str - - -@dataclass(slots=True) -class SkillInput: - name: str - path: str - - -@dataclass(slots=True) -class MentionInput: - name: str - path: str - - -InputItem = TextInput | ImageInput | LocalImageInput | SkillInput | MentionInput -Input = list[InputItem] | InputItem - - -def _to_wire_item(item: InputItem) -> JsonObject: - if isinstance(item, TextInput): - return {"type": "text", "text": item.text} - if isinstance(item, ImageInput): - return {"type": "image", "url": item.url} - if isinstance(item, LocalImageInput): - return {"type": "localImage", "path": item.path} - if isinstance(item, SkillInput): - return {"type": "skill", "name": item.name, "path": item.path} - if isinstance(item, MentionInput): - return {"type": "mention", "name": item.name, "path": item.path} - raise TypeError(f"unsupported input item: {type(item)!r}") - - -def _to_wire_input(input: Input) -> list[JsonObject]: - if isinstance(input, list): - return [_to_wire_item(i) for i in input] - return [_to_wire_item(input)] +from ._inputs import ( + ImageInput, + Input, + InputItem, + LocalImageInput, + MentionInput, + RunInput, + SkillInput, + TextInput, + _normalize_run_input, + _to_wire_input, +) +from ._run import ( + RunResult, + _collect_async_run_result, + _collect_run_result, +) def _split_user_agent(user_agent: str) -> tuple[str | None, str | None]: @@ -503,6 +469,40 @@ class Thread: _client: AppServerClient id: str + def run( + self, + input: RunInput, + *, + approval_policy: AskForApproval | None = None, + approvals_reviewer: ApprovalsReviewer | None = None, + cwd: str | None = None, + effort: ReasoningEffort | None = None, + model: str | None = None, + output_schema: JsonObject | None = None, + personality: Personality | None = None, + sandbox_policy: SandboxPolicy | None = None, + service_tier: ServiceTier | None = None, + summary: ReasoningSummary | None = None, + ) -> RunResult: + turn = self.turn( + _normalize_run_input(input), + approval_policy=approval_policy, + approvals_reviewer=approvals_reviewer, + cwd=cwd, + effort=effort, + model=model, + output_schema=output_schema, + personality=personality, + sandbox_policy=sandbox_policy, + service_tier=service_tier, + summary=summary, + ) + stream = turn.stream() + try: + return _collect_run_result(stream, turn_id=turn.id) + finally: + stream.close() + # BEGIN GENERATED: Thread.flat_methods def turn( self, @@ -553,6 +553,40 @@ class AsyncThread: _codex: AsyncCodex id: str + async def run( + self, + input: RunInput, + *, + approval_policy: AskForApproval | None = None, + approvals_reviewer: ApprovalsReviewer | None = None, + cwd: str | None = None, + effort: ReasoningEffort | None = None, + model: str | None = None, + output_schema: JsonObject | None = None, + personality: Personality | None = None, + sandbox_policy: SandboxPolicy | None = None, + service_tier: ServiceTier | None = None, + summary: ReasoningSummary | None = None, + ) -> RunResult: + turn = await self.turn( + _normalize_run_input(input), + approval_policy=approval_policy, + approvals_reviewer=approvals_reviewer, + cwd=cwd, + effort=effort, + model=model, + output_schema=output_schema, + personality=personality, + sandbox_policy=sandbox_policy, + service_tier=service_tier, + summary=summary, + ) + stream = turn.stream() + try: + return await _collect_async_run_result(stream, turn_id=turn.id) + finally: + await stream.aclose() + # BEGIN GENERATED: AsyncThread.flat_methods async def turn( self, diff --git a/sdk/python/tests/test_public_api_runtime_behavior.py b/sdk/python/tests/test_public_api_runtime_behavior.py index dfddd3196..10865cf87 100644 --- a/sdk/python/tests/test_public_api_runtime_behavior.py +++ b/sdk/python/tests/test_public_api_runtime_behavior.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio from collections import deque from pathlib import Path +from types import SimpleNamespace import pytest @@ -10,14 +11,20 @@ import codex_app_server.api as public_api_module from codex_app_server.client import AppServerClient from codex_app_server.generated.v2_all import ( AgentMessageDeltaNotification, + ItemCompletedNotification, + MessagePhase, + ThreadTokenUsageUpdatedNotification, TurnCompletedNotification, TurnStatus, ) from codex_app_server.models import InitializeResponse, Notification from codex_app_server.api import ( AsyncCodex, + AsyncThread, AsyncTurnHandle, Codex, + RunResult, + Thread, TurnHandle, ) @@ -48,16 +55,78 @@ def _completed_notification( thread_id: str = "thread-1", turn_id: str = "turn-1", status: str = "completed", + error_message: str | None = None, ) -> Notification: + turn: dict[str, object] = { + "id": turn_id, + "items": [], + "status": status, + } + if error_message is not None: + turn["error"] = {"message": error_message} return Notification( method="turn/completed", payload=TurnCompletedNotification.model_validate( { "threadId": thread_id, - "turn": { - "id": turn_id, - "items": [], - "status": status, + "turn": turn, + } + ), + ) + + +def _item_completed_notification( + *, + thread_id: str = "thread-1", + turn_id: str = "turn-1", + text: str = "final text", + phase: MessagePhase | None = None, +) -> Notification: + item: dict[str, object] = { + "id": "item-1", + "text": text, + "type": "agentMessage", + } + if phase is not None: + item["phase"] = phase.value + return Notification( + method="item/completed", + payload=ItemCompletedNotification.model_validate( + { + "item": item, + "threadId": thread_id, + "turnId": turn_id, + } + ), + ) + + +def _token_usage_notification( + *, + thread_id: str = "thread-1", + turn_id: str = "turn-1", +) -> Notification: + return Notification( + method="thread/tokenUsage/updated", + payload=ThreadTokenUsageUpdatedNotification.model_validate( + { + "threadId": thread_id, + "turnId": turn_id, + "tokenUsage": { + "last": { + "cachedInputTokens": 1, + "inputTokens": 2, + "outputTokens": 3, + "reasoningOutputTokens": 4, + "totalTokens": 9, + }, + "total": { + "cachedInputTokens": 5, + "inputTokens": 6, + "outputTokens": 7, + "reasoningOutputTokens": 8, + "totalTokens": 26, + }, }, } ), @@ -225,6 +294,277 @@ def test_turn_run_returns_completed_turn_payload() -> None: assert result.items == [] +def test_thread_run_accepts_string_input_and_returns_run_result() -> None: + client = AppServerClient() + item_notification = _item_completed_notification(text="Hello.") + usage_notification = _token_usage_notification() + notifications: deque[Notification] = deque( + [ + item_notification, + usage_notification, + _completed_notification(), + ] + ) + client.next_notification = notifications.popleft # type: ignore[method-assign] + seen: dict[str, object] = {} + + def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202 + seen["thread_id"] = thread_id + seen["wire_input"] = wire_input + seen["params"] = params + return SimpleNamespace(turn=SimpleNamespace(id="turn-1")) + + client.turn_start = fake_turn_start # type: ignore[method-assign] + + result = Thread(client, "thread-1").run("hello") + + assert seen["thread_id"] == "thread-1" + assert seen["wire_input"] == [{"type": "text", "text": "hello"}] + assert result == RunResult( + final_response="Hello.", + items=[item_notification.payload.item], + usage=usage_notification.payload.token_usage, + ) + + +def test_thread_run_uses_last_completed_assistant_message_as_final_response() -> None: + client = AppServerClient() + first_item_notification = _item_completed_notification(text="First message") + second_item_notification = _item_completed_notification(text="Second message") + notifications: deque[Notification] = deque( + [ + first_item_notification, + second_item_notification, + _completed_notification(), + ] + ) + client.next_notification = notifications.popleft # type: ignore[method-assign] + client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731 + turn=SimpleNamespace(id="turn-1") + ) + + result = Thread(client, "thread-1").run("hello") + + assert result.final_response == "Second message" + assert result.items == [ + first_item_notification.payload.item, + second_item_notification.payload.item, + ] + + +def test_thread_run_preserves_empty_last_assistant_message() -> None: + client = AppServerClient() + first_item_notification = _item_completed_notification(text="First message") + second_item_notification = _item_completed_notification(text="") + notifications: deque[Notification] = deque( + [ + first_item_notification, + second_item_notification, + _completed_notification(), + ] + ) + client.next_notification = notifications.popleft # type: ignore[method-assign] + client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731 + turn=SimpleNamespace(id="turn-1") + ) + + result = Thread(client, "thread-1").run("hello") + + assert result.final_response == "" + assert result.items == [ + first_item_notification.payload.item, + second_item_notification.payload.item, + ] + + +def test_thread_run_prefers_explicit_final_answer_over_later_commentary() -> None: + client = AppServerClient() + final_answer_notification = _item_completed_notification( + text="Final answer", + phase=MessagePhase.final_answer, + ) + commentary_notification = _item_completed_notification( + text="Commentary", + phase=MessagePhase.commentary, + ) + notifications: deque[Notification] = deque( + [ + final_answer_notification, + commentary_notification, + _completed_notification(), + ] + ) + client.next_notification = notifications.popleft # type: ignore[method-assign] + client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731 + turn=SimpleNamespace(id="turn-1") + ) + + result = Thread(client, "thread-1").run("hello") + + assert result.final_response == "Final answer" + assert result.items == [ + final_answer_notification.payload.item, + commentary_notification.payload.item, + ] + + +def test_thread_run_returns_none_when_only_commentary_messages_complete() -> None: + client = AppServerClient() + commentary_notification = _item_completed_notification( + text="Commentary", + phase=MessagePhase.commentary, + ) + notifications: deque[Notification] = deque( + [ + commentary_notification, + _completed_notification(), + ] + ) + client.next_notification = notifications.popleft # type: ignore[method-assign] + client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731 + turn=SimpleNamespace(id="turn-1") + ) + + result = Thread(client, "thread-1").run("hello") + + assert result.final_response is None + assert result.items == [commentary_notification.payload.item] + + +def test_thread_run_raises_on_failed_turn() -> None: + client = AppServerClient() + notifications: deque[Notification] = deque( + [ + _completed_notification(status="failed", error_message="boom"), + ] + ) + client.next_notification = notifications.popleft # type: ignore[method-assign] + client.turn_start = lambda thread_id, wire_input, *, params=None: SimpleNamespace( # noqa: ARG005,E731 + turn=SimpleNamespace(id="turn-1") + ) + + with pytest.raises(RuntimeError, match="boom"): + Thread(client, "thread-1").run("hello") + + +def test_async_thread_run_accepts_string_input_and_returns_run_result() -> None: + async def scenario() -> None: + codex = AsyncCodex() + + async def fake_ensure_initialized() -> None: + return None + + item_notification = _item_completed_notification(text="Hello async.") + usage_notification = _token_usage_notification() + notifications: deque[Notification] = deque( + [ + item_notification, + usage_notification, + _completed_notification(), + ] + ) + seen: dict[str, object] = {} + + async def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202 + seen["thread_id"] = thread_id + seen["wire_input"] = wire_input + seen["params"] = params + return SimpleNamespace(turn=SimpleNamespace(id="turn-1")) + + async def fake_next_notification() -> Notification: + return notifications.popleft() + + codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign] + codex._client.turn_start = fake_turn_start # type: ignore[method-assign] + codex._client.next_notification = fake_next_notification # type: ignore[method-assign] + + result = await AsyncThread(codex, "thread-1").run("hello") + + assert seen["thread_id"] == "thread-1" + assert seen["wire_input"] == [{"type": "text", "text": "hello"}] + assert result == RunResult( + final_response="Hello async.", + items=[item_notification.payload.item], + usage=usage_notification.payload.token_usage, + ) + + asyncio.run(scenario()) + + +def test_async_thread_run_uses_last_completed_assistant_message_as_final_response() -> None: + async def scenario() -> None: + codex = AsyncCodex() + + async def fake_ensure_initialized() -> None: + return None + + first_item_notification = _item_completed_notification(text="First async message") + second_item_notification = _item_completed_notification(text="Second async message") + notifications: deque[Notification] = deque( + [ + first_item_notification, + second_item_notification, + _completed_notification(), + ] + ) + + async def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202,ARG001 + return SimpleNamespace(turn=SimpleNamespace(id="turn-1")) + + async def fake_next_notification() -> Notification: + return notifications.popleft() + + codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign] + codex._client.turn_start = fake_turn_start # type: ignore[method-assign] + codex._client.next_notification = fake_next_notification # type: ignore[method-assign] + + result = await AsyncThread(codex, "thread-1").run("hello") + + assert result.final_response == "Second async message" + assert result.items == [ + first_item_notification.payload.item, + second_item_notification.payload.item, + ] + + asyncio.run(scenario()) + + +def test_async_thread_run_returns_none_when_only_commentary_messages_complete() -> None: + async def scenario() -> None: + codex = AsyncCodex() + + async def fake_ensure_initialized() -> None: + return None + + commentary_notification = _item_completed_notification( + text="Commentary", + phase=MessagePhase.commentary, + ) + notifications: deque[Notification] = deque( + [ + commentary_notification, + _completed_notification(), + ] + ) + + async def fake_turn_start(thread_id: str, wire_input: object, *, params=None): # noqa: ANN001,ANN202,ARG001 + return SimpleNamespace(turn=SimpleNamespace(id="turn-1")) + + async def fake_next_notification() -> Notification: + return notifications.popleft() + + codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign] + codex._client.turn_start = fake_turn_start # type: ignore[method-assign] + codex._client.next_notification = fake_next_notification # type: ignore[method-assign] + + result = await AsyncThread(codex, "thread-1").run("hello") + + assert result.final_response is None + assert result.items == [commentary_notification.payload.item] + + asyncio.run(scenario()) + + def test_retry_examples_compare_status_with_enum() -> None: for path in ( ROOT / "examples" / "10_error_handling_and_retry" / "sync.py", diff --git a/sdk/python/tests/test_public_api_signatures.py b/sdk/python/tests/test_public_api_signatures.py index 4ac051c03..ce1b84725 100644 --- a/sdk/python/tests/test_public_api_signatures.py +++ b/sdk/python/tests/test_public_api_signatures.py @@ -4,7 +4,7 @@ import importlib.resources as resources import inspect from typing import Any -from codex_app_server import AppServerConfig +from codex_app_server import AppServerConfig, RunResult from codex_app_server.models import InitializeResponse from codex_app_server.api import AsyncCodex, AsyncThread, Codex, Thread @@ -31,6 +31,10 @@ def test_root_exports_app_server_config() -> None: assert AppServerConfig.__name__ == "AppServerConfig" +def test_root_exports_run_result() -> None: + assert RunResult.__name__ == "RunResult" + + def test_package_includes_py_typed_marker() -> None: marker = resources.files("codex_app_server").joinpath("py.typed") assert marker.is_file() @@ -101,6 +105,18 @@ def test_generated_public_signatures_are_snake_case_and_typed() -> None: "service_tier", "summary", ], + Thread.run: [ + "approval_policy", + "approvals_reviewer", + "cwd", + "effort", + "model", + "output_schema", + "personality", + "sandbox_policy", + "service_tier", + "summary", + ], AsyncCodex.thread_start: [ "approval_policy", "approvals_reviewer", @@ -164,6 +180,18 @@ def test_generated_public_signatures_are_snake_case_and_typed() -> None: "service_tier", "summary", ], + AsyncThread.run: [ + "approval_policy", + "approvals_reviewer", + "cwd", + "effort", + "model", + "output_schema", + "personality", + "sandbox_policy", + "service_tier", + "summary", + ], } for fn, expected_kwargs in expected.items(): diff --git a/sdk/python/tests/test_real_app_server_integration.py b/sdk/python/tests/test_real_app_server_integration.py index 3790e37dc..b5e37c444 100644 --- a/sdk/python/tests/test_real_app_server_integration.py +++ b/sdk/python/tests/test_real_app_server_integration.py @@ -265,6 +265,36 @@ def test_real_thread_and_turn_start_smoke(runtime_env: PreparedRuntimeEnv) -> No assert isinstance(data["persisted_items_count"], int) +def test_real_thread_run_convenience_smoke(runtime_env: PreparedRuntimeEnv) -> None: + data = _run_json_python( + runtime_env, + textwrap.dedent( + """ + import json + from codex_app_server import Codex + + with Codex() as codex: + thread = codex.thread_start( + model="gpt-5.4", + config={"model_reasoning_effort": "high"}, + ) + result = thread.run("say ok") + print(json.dumps({ + "thread_id": thread.id, + "final_response": result.final_response, + "items_count": len(result.items), + "has_usage": result.usage is not None, + })) + """ + ), + ) + + assert isinstance(data["thread_id"], str) and data["thread_id"].strip() + assert isinstance(data["final_response"], str) and data["final_response"].strip() + assert isinstance(data["items_count"], int) + assert isinstance(data["has_usage"], bool) + + def test_real_async_thread_turn_usage_and_ids_smoke( runtime_env: PreparedRuntimeEnv, ) -> None: @@ -308,6 +338,42 @@ def test_real_async_thread_turn_usage_and_ids_smoke( assert isinstance(data["persisted_items_count"], int) +def test_real_async_thread_run_convenience_smoke( + runtime_env: PreparedRuntimeEnv, +) -> None: + data = _run_json_python( + runtime_env, + textwrap.dedent( + """ + import asyncio + import json + from codex_app_server import AsyncCodex + + async def main(): + async with AsyncCodex() as codex: + thread = await codex.thread_start( + model="gpt-5.4", + config={"model_reasoning_effort": "high"}, + ) + result = await thread.run("say ok") + print(json.dumps({ + "thread_id": thread.id, + "final_response": result.final_response, + "items_count": len(result.items), + "has_usage": result.usage is not None, + })) + + asyncio.run(main()) + """ + ), + ) + + assert isinstance(data["thread_id"], str) and data["thread_id"].strip() + assert isinstance(data["final_response"], str) and data["final_response"].strip() + assert isinstance(data["items_count"], int) + assert isinstance(data["has_usage"], bool) + + def test_notebook_bootstrap_resolves_sdk_and_runtime_from_unrelated_cwd( runtime_env: PreparedRuntimeEnv, ) -> None: