feat(app-server): turn/steer API (#10821)

This PR adds a dedicated `turn/steer` API for appending user input to an
in-flight turn.

## Motivation
Currently, steering in the app is implemented by just calling
`turn/start` while a turn is running. This has some really weird quirks:
- Client gets back a new `turn.id`, even though streamed
events/approvals remained tied to the original active turn ID.
- All the various turn-level override params on `turn/start` do not
apply to the "steer", and would only apply to the next real turn.
- There can also be a race condition where the client thinks the turn is
active but the server has already completed it, so there might be bugs
if the client has baked in some client-specific behavior thinking it's a
steer when in fact the server kicked off a new turn. This is
particularly possible when running a client against a remote app-server.

Having a dedicated `turn/steer` API eliminates all those quirks.

`turn/steer` behavior:
- Requires an active turn on threadId. Returns a JSON-RPC error if there
is no active turn.
- If expectedTurnId is provided, it must match the active turn (more
useful when connecting to a remote app-server).
- Does not emit `turn/started`.
- Does not accept turn overrides (`cwd`, `model`, `sandbox`, etc.) or
`outputSchema` to accurately reflect that these are not applied when
steering.
This commit is contained in:
Owen Lin 2026-02-05 16:35:04 -08:00 committed by GitHub
parent 729b016515
commit 0d8b2b74c4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 768 additions and 13 deletions

View file

@ -2810,6 +2810,29 @@
],
"type": "object"
},
"TurnSteerParams": {
"properties": {
"expectedTurnId": {
"description": "Required active turn id precondition. The request fails when it does not match the currently active turn.",
"type": "string"
},
"input": {
"items": {
"$ref": "#/definitions/UserInput"
},
"type": "array"
},
"threadId": {
"type": "string"
}
},
"required": [
"expectedTurnId",
"input",
"threadId"
],
"type": "object"
},
"UserInput": {
"oneOf": [
{
@ -3511,6 +3534,30 @@
"title": "Turn/startRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
},
"method": {
"enum": [
"turn/steer"
],
"title": "Turn/steerRequestMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/TurnSteerParams"
}
},
"required": [
"id",
"method",
"params"
],
"title": "Turn/steerRequest",
"type": "object"
},
{
"properties": {
"id": {

View file

@ -862,6 +862,30 @@
"title": "Turn/startRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
},
"method": {
"enum": [
"turn/steer"
],
"title": "Turn/steerRequestMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/v2/TurnSteerParams"
}
},
"required": [
"id",
"method",
"params"
],
"title": "Turn/steerRequest",
"type": "object"
},
{
"properties": {
"id": {
@ -15729,6 +15753,44 @@
],
"type": "string"
},
"TurnSteerParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"expectedTurnId": {
"description": "Required active turn id precondition. The request fails when it does not match the currently active turn.",
"type": "string"
},
"input": {
"items": {
"$ref": "#/definitions/v2/UserInput"
},
"type": "array"
},
"threadId": {
"type": "string"
}
},
"required": [
"expectedTurnId",
"input",
"threadId"
],
"title": "TurnSteerParams",
"type": "object"
},
"TurnSteerResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"turnId": {
"type": "string"
}
},
"required": [
"turnId"
],
"title": "TurnSteerResponse",
"type": "object"
},
"UserInput": {
"oneOf": [
{

View file

@ -0,0 +1,189 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"ByteRange": {
"properties": {
"end": {
"format": "uint",
"minimum": 0.0,
"type": "integer"
},
"start": {
"format": "uint",
"minimum": 0.0,
"type": "integer"
}
},
"required": [
"end",
"start"
],
"type": "object"
},
"TextElement": {
"properties": {
"byteRange": {
"allOf": [
{
"$ref": "#/definitions/ByteRange"
}
],
"description": "Byte range in the parent `text` buffer that this element occupies."
},
"placeholder": {
"description": "Optional human-readable placeholder for the element, displayed in the UI.",
"type": [
"string",
"null"
]
}
},
"required": [
"byteRange"
],
"type": "object"
},
"UserInput": {
"oneOf": [
{
"properties": {
"text": {
"type": "string"
},
"text_elements": {
"default": [],
"description": "UI-defined spans within `text` used to render or persist special elements.",
"items": {
"$ref": "#/definitions/TextElement"
},
"type": "array"
},
"type": {
"enum": [
"text"
],
"title": "TextUserInputType",
"type": "string"
}
},
"required": [
"text",
"type"
],
"title": "TextUserInput",
"type": "object"
},
{
"properties": {
"type": {
"enum": [
"image"
],
"title": "ImageUserInputType",
"type": "string"
},
"url": {
"type": "string"
}
},
"required": [
"type",
"url"
],
"title": "ImageUserInput",
"type": "object"
},
{
"properties": {
"path": {
"type": "string"
},
"type": {
"enum": [
"localImage"
],
"title": "LocalImageUserInputType",
"type": "string"
}
},
"required": [
"path",
"type"
],
"title": "LocalImageUserInput",
"type": "object"
},
{
"properties": {
"name": {
"type": "string"
},
"path": {
"type": "string"
},
"type": {
"enum": [
"skill"
],
"title": "SkillUserInputType",
"type": "string"
}
},
"required": [
"name",
"path",
"type"
],
"title": "SkillUserInput",
"type": "object"
},
{
"properties": {
"name": {
"type": "string"
},
"path": {
"type": "string"
},
"type": {
"enum": [
"mention"
],
"title": "MentionUserInputType",
"type": "string"
}
},
"required": [
"name",
"path",
"type"
],
"title": "MentionUserInput",
"type": "object"
}
]
}
},
"properties": {
"expectedTurnId": {
"description": "Required active turn id precondition. The request fails when it does not match the currently active turn.",
"type": "string"
},
"input": {
"items": {
"$ref": "#/definitions/UserInput"
},
"type": "array"
},
"threadId": {
"type": "string"
}
},
"required": [
"expectedTurnId",
"input",
"threadId"
],
"title": "TurnSteerParams",
"type": "object"
}

View file

@ -0,0 +1,13 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"turnId": {
"type": "string"
}
},
"required": [
"turnId"
],
"title": "TurnSteerResponse",
"type": "object"
}

View file

@ -52,8 +52,9 @@ import type { ThreadStartParams } from "./v2/ThreadStartParams";
import type { ThreadUnarchiveParams } from "./v2/ThreadUnarchiveParams";
import type { TurnInterruptParams } from "./v2/TurnInterruptParams";
import type { TurnStartParams } from "./v2/TurnStartParams";
import type { TurnSteerParams } from "./v2/TurnSteerParams";
/**
* Request from the client to the server.
*/
export type ClientRequest ={ "method": "initialize", id: RequestId, params: InitializeParams, } | { "method": "thread/start", id: RequestId, params: ThreadStartParams, } | { "method": "thread/resume", id: RequestId, params: ThreadResumeParams, } | { "method": "thread/fork", id: RequestId, params: ThreadForkParams, } | { "method": "thread/archive", id: RequestId, params: ThreadArchiveParams, } | { "method": "thread/name/set", id: RequestId, params: ThreadSetNameParams, } | { "method": "thread/unarchive", id: RequestId, params: ThreadUnarchiveParams, } | { "method": "thread/compact/start", id: RequestId, params: ThreadCompactStartParams, } | { "method": "thread/rollback", id: RequestId, params: ThreadRollbackParams, } | { "method": "thread/list", id: RequestId, params: ThreadListParams, } | { "method": "thread/loaded/list", id: RequestId, params: ThreadLoadedListParams, } | { "method": "thread/read", id: RequestId, params: ThreadReadParams, } | { "method": "skills/list", id: RequestId, params: SkillsListParams, } | { "method": "skills/remote/read", id: RequestId, params: SkillsRemoteReadParams, } | { "method": "skills/remote/write", id: RequestId, params: SkillsRemoteWriteParams, } | { "method": "app/list", id: RequestId, params: AppsListParams, } | { "method": "skills/config/write", id: RequestId, params: SkillsConfigWriteParams, } | { "method": "turn/start", id: RequestId, params: TurnStartParams, } | { "method": "turn/interrupt", id: RequestId, params: TurnInterruptParams, } | { "method": "review/start", id: RequestId, params: ReviewStartParams, } | { "method": "model/list", id: RequestId, params: ModelListParams, } | { "method": "experimentalFeature/list", id: RequestId, params: ExperimentalFeatureListParams, } | { "method": "mcpServer/oauth/login", id: RequestId, params: McpServerOauthLoginParams, } | { "method": "config/mcpServer/reload", id: RequestId, params: undefined, } | { "method": "mcpServerStatus/list", id: RequestId, params: ListMcpServerStatusParams, } | { "method": "account/login/start", id: RequestId, params: LoginAccountParams, } | { "method": "account/login/cancel", id: RequestId, params: CancelLoginAccountParams, } | { "method": "account/logout", id: RequestId, params: undefined, } | { "method": "account/rateLimits/read", id: RequestId, params: undefined, } | { "method": "feedback/upload", id: RequestId, params: FeedbackUploadParams, } | { "method": "command/exec", id: RequestId, params: CommandExecParams, } | { "method": "config/read", id: RequestId, params: ConfigReadParams, } | { "method": "config/value/write", id: RequestId, params: ConfigValueWriteParams, } | { "method": "config/batchWrite", id: RequestId, params: ConfigBatchWriteParams, } | { "method": "configRequirements/read", id: RequestId, params: undefined, } | { "method": "account/read", id: RequestId, params: GetAccountParams, } | { "method": "newConversation", id: RequestId, params: NewConversationParams, } | { "method": "getConversationSummary", id: RequestId, params: GetConversationSummaryParams, } | { "method": "listConversations", id: RequestId, params: ListConversationsParams, } | { "method": "resumeConversation", id: RequestId, params: ResumeConversationParams, } | { "method": "forkConversation", id: RequestId, params: ForkConversationParams, } | { "method": "archiveConversation", id: RequestId, params: ArchiveConversationParams, } | { "method": "sendUserMessage", id: RequestId, params: SendUserMessageParams, } | { "method": "sendUserTurn", id: RequestId, params: SendUserTurnParams, } | { "method": "interruptConversation", id: RequestId, params: InterruptConversationParams, } | { "method": "addConversationListener", id: RequestId, params: AddConversationListenerParams, } | { "method": "removeConversationListener", id: RequestId, params: RemoveConversationListenerParams, } | { "method": "gitDiffToRemote", id: RequestId, params: GitDiffToRemoteParams, } | { "method": "loginApiKey", id: RequestId, params: LoginApiKeyParams, } | { "method": "loginChatGpt", id: RequestId, params: undefined, } | { "method": "cancelLoginChatGpt", id: RequestId, params: CancelLoginChatGptParams, } | { "method": "logoutChatGpt", id: RequestId, params: undefined, } | { "method": "getAuthStatus", id: RequestId, params: GetAuthStatusParams, } | { "method": "getUserSavedConfig", id: RequestId, params: undefined, } | { "method": "setDefaultModel", id: RequestId, params: SetDefaultModelParams, } | { "method": "getUserAgent", id: RequestId, params: undefined, } | { "method": "userInfo", id: RequestId, params: undefined, } | { "method": "fuzzyFileSearch", id: RequestId, params: FuzzyFileSearchParams, } | { "method": "execOneOffCommand", id: RequestId, params: ExecOneOffCommandParams, };
export type ClientRequest ={ "method": "initialize", id: RequestId, params: InitializeParams, } | { "method": "thread/start", id: RequestId, params: ThreadStartParams, } | { "method": "thread/resume", id: RequestId, params: ThreadResumeParams, } | { "method": "thread/fork", id: RequestId, params: ThreadForkParams, } | { "method": "thread/archive", id: RequestId, params: ThreadArchiveParams, } | { "method": "thread/name/set", id: RequestId, params: ThreadSetNameParams, } | { "method": "thread/unarchive", id: RequestId, params: ThreadUnarchiveParams, } | { "method": "thread/compact/start", id: RequestId, params: ThreadCompactStartParams, } | { "method": "thread/rollback", id: RequestId, params: ThreadRollbackParams, } | { "method": "thread/list", id: RequestId, params: ThreadListParams, } | { "method": "thread/loaded/list", id: RequestId, params: ThreadLoadedListParams, } | { "method": "thread/read", id: RequestId, params: ThreadReadParams, } | { "method": "skills/list", id: RequestId, params: SkillsListParams, } | { "method": "skills/remote/read", id: RequestId, params: SkillsRemoteReadParams, } | { "method": "skills/remote/write", id: RequestId, params: SkillsRemoteWriteParams, } | { "method": "app/list", id: RequestId, params: AppsListParams, } | { "method": "skills/config/write", id: RequestId, params: SkillsConfigWriteParams, } | { "method": "turn/start", id: RequestId, params: TurnStartParams, } | { "method": "turn/steer", id: RequestId, params: TurnSteerParams, } | { "method": "turn/interrupt", id: RequestId, params: TurnInterruptParams, } | { "method": "review/start", id: RequestId, params: ReviewStartParams, } | { "method": "model/list", id: RequestId, params: ModelListParams, } | { "method": "experimentalFeature/list", id: RequestId, params: ExperimentalFeatureListParams, } | { "method": "mcpServer/oauth/login", id: RequestId, params: McpServerOauthLoginParams, } | { "method": "config/mcpServer/reload", id: RequestId, params: undefined, } | { "method": "mcpServerStatus/list", id: RequestId, params: ListMcpServerStatusParams, } | { "method": "account/login/start", id: RequestId, params: LoginAccountParams, } | { "method": "account/login/cancel", id: RequestId, params: CancelLoginAccountParams, } | { "method": "account/logout", id: RequestId, params: undefined, } | { "method": "account/rateLimits/read", id: RequestId, params: undefined, } | { "method": "feedback/upload", id: RequestId, params: FeedbackUploadParams, } | { "method": "command/exec", id: RequestId, params: CommandExecParams, } | { "method": "config/read", id: RequestId, params: ConfigReadParams, } | { "method": "config/value/write", id: RequestId, params: ConfigValueWriteParams, } | { "method": "config/batchWrite", id: RequestId, params: ConfigBatchWriteParams, } | { "method": "configRequirements/read", id: RequestId, params: undefined, } | { "method": "account/read", id: RequestId, params: GetAccountParams, } | { "method": "newConversation", id: RequestId, params: NewConversationParams, } | { "method": "getConversationSummary", id: RequestId, params: GetConversationSummaryParams, } | { "method": "listConversations", id: RequestId, params: ListConversationsParams, } | { "method": "resumeConversation", id: RequestId, params: ResumeConversationParams, } | { "method": "forkConversation", id: RequestId, params: ForkConversationParams, } | { "method": "archiveConversation", id: RequestId, params: ArchiveConversationParams, } | { "method": "sendUserMessage", id: RequestId, params: SendUserMessageParams, } | { "method": "sendUserTurn", id: RequestId, params: SendUserTurnParams, } | { "method": "interruptConversation", id: RequestId, params: InterruptConversationParams, } | { "method": "addConversationListener", id: RequestId, params: AddConversationListenerParams, } | { "method": "removeConversationListener", id: RequestId, params: RemoveConversationListenerParams, } | { "method": "gitDiffToRemote", id: RequestId, params: GitDiffToRemoteParams, } | { "method": "loginApiKey", id: RequestId, params: LoginApiKeyParams, } | { "method": "loginChatGpt", id: RequestId, params: undefined, } | { "method": "cancelLoginChatGpt", id: RequestId, params: CancelLoginChatGptParams, } | { "method": "logoutChatGpt", id: RequestId, params: undefined, } | { "method": "getAuthStatus", id: RequestId, params: GetAuthStatusParams, } | { "method": "getUserSavedConfig", id: RequestId, params: undefined, } | { "method": "setDefaultModel", id: RequestId, params: SetDefaultModelParams, } | { "method": "getUserAgent", id: RequestId, params: undefined, } | { "method": "userInfo", id: RequestId, params: undefined, } | { "method": "fuzzyFileSearch", id: RequestId, params: FuzzyFileSearchParams, } | { "method": "execOneOffCommand", id: RequestId, params: ExecOneOffCommandParams, };

View file

@ -0,0 +1,11 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { UserInput } from "./UserInput";
export type TurnSteerParams = { threadId: string, input: Array<UserInput>,
/**
* Required active turn id precondition. The request fails when it does not
* match the currently active turn.
*/
expectedTurnId: string, };

View file

@ -0,0 +1,5 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type TurnSteerResponse = { turnId: string, };

View file

@ -180,6 +180,8 @@ export type { TurnStartParams } from "./TurnStartParams";
export type { TurnStartResponse } from "./TurnStartResponse";
export type { TurnStartedNotification } from "./TurnStartedNotification";
export type { TurnStatus } from "./TurnStatus";
export type { TurnSteerParams } from "./TurnSteerParams";
export type { TurnSteerResponse } from "./TurnSteerResponse";
export type { UserInput } from "./UserInput";
export type { WebSearchAction } from "./WebSearchAction";
export type { WindowsWorldWritableWarningNotification } from "./WindowsWorldWritableWarningNotification";

View file

@ -252,6 +252,10 @@ client_request_definitions! {
params: v2::TurnStartParams,
response: v2::TurnStartResponse,
},
TurnSteer => "turn/steer" {
params: v2::TurnSteerParams,
response: v2::TurnSteerResponse,
},
TurnInterrupt => "turn/interrupt" {
params: v2::TurnInterruptParams,
response: v2::TurnInterruptResponse,

View file

@ -2092,6 +2092,24 @@ pub struct TurnStartResponse {
pub turn: Turn,
}
#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct TurnSteerParams {
pub thread_id: String,
pub input: Vec<UserInput>,
/// Required active turn id precondition. The request fails when it does not
/// match the currently active turn.
pub expected_turn_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct TurnSteerResponse {
pub turn_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]

View file

@ -95,6 +95,7 @@ Example (from OpenAI's official VSCode extension):
- `thread/compact/start` — trigger conversation history compaction for a thread; returns `{}` immediately while progress streams through standard turn/item notifications.
- `thread/rollback` — drop the last N turns from the agents in-memory context and persist a rollback marker in the rollout so future resumes see the pruned history; returns the updated `thread` (with `turns` populated) on success.
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications.
- `turn/steer` — add user input to an already in-flight turn without starting a new turn; returns the active `turnId` that accepted the input.
- `turn/interrupt` — request cancellation of an in-flight turn by `(thread_id, turn_id)`; success is an empty `{}` response and the turn finishes with `status: "interrupted"`.
- `review/start` — kick off Codexs automated reviewer for a thread; responds like `turn/start` and emits `item/started`/`item/completed` notifications with `enteredReviewMode` and `exitedReviewMode` items, plus a final assistant `agentMessage` containing the review.
- `command/exec` — run a single command under the server sandbox without starting a thread/turn (handy for utilities and validation).
@ -363,6 +364,22 @@ You can cancel a running Turn with `turn/interrupt`.
The server requests cancellations for running subprocesses, then emits a `turn/completed` event with `status: "interrupted"`. Rely on the `turn/completed` to know when Codex-side cleanup is done.
### Example: Steer an active turn
Use `turn/steer` to append additional user input to the currently active turn. This does not emit
`turn/started` and does not accept turn context overrides.
```json
{ "method": "turn/steer", "id": 32, "params": {
"threadId": "thr_123",
"input": [ { "type": "text", "text": "Actually focus on failing tests first." } ],
"expectedTurnId": "turn_456"
} }
{ "id": 32, "result": { "turnId": "turn_456" } }
```
`expectedTurnId` is required. If there is no active turn (or `expectedTurnId` does not match the active turn), the request fails with an `invalid request` error.
### Example: Request a code review
Use `review/start` to run Codexs reviewer on the currently checked-out project. The request takes the thread id plus a `target` describing what should be reviewed:

View file

@ -139,6 +139,8 @@ use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStartedNotification;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::TurnSteerParams;
use codex_app_server_protocol::TurnSteerResponse;
use codex_app_server_protocol::UserInfoResponse;
use codex_app_server_protocol::UserInput as V2UserInput;
use codex_app_server_protocol::UserSavedConfig;
@ -154,6 +156,7 @@ use codex_core::InitialHistory;
use codex_core::NewThread;
use codex_core::RolloutRecorder;
use codex_core::SessionMeta;
use codex_core::SteerInputError;
use codex_core::ThreadConfigSnapshot;
use codex_core::ThreadManager;
use codex_core::ThreadSortKey as CoreThreadSortKey;
@ -532,6 +535,10 @@ impl CodexMessageProcessor {
self.turn_start(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::TurnSteer { request_id, params } => {
self.turn_steer(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::TurnInterrupt { request_id, params } => {
self.turn_interrupt(to_connection_request_id(request_id), params)
.await;
@ -4620,6 +4627,63 @@ impl CodexMessageProcessor {
}
}
async fn turn_steer(&self, request_id: ConnectionRequestId, params: TurnSteerParams) {
let (_, thread) = match self.load_thread(&params.thread_id).await {
Ok(v) => v,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
if params.expected_turn_id.is_empty() {
self.send_invalid_request_error(
request_id,
"expectedTurnId must not be empty".to_string(),
)
.await;
return;
}
let mapped_items: Vec<CoreInputItem> = params
.input
.into_iter()
.map(V2UserInput::into_core)
.collect();
match thread
.steer_input(mapped_items, Some(&params.expected_turn_id))
.await
{
Ok(turn_id) => {
let response = TurnSteerResponse { turn_id };
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
let (code, message) = match err {
SteerInputError::NoActiveTurn(_) => (
INVALID_REQUEST_ERROR_CODE,
"no active turn to steer".to_string(),
),
SteerInputError::ExpectedTurnMismatch { expected, actual } => (
INVALID_REQUEST_ERROR_CODE,
format!("expected active turn id `{expected}` but found `{actual}`"),
),
SteerInputError::EmptyInput => (
INVALID_REQUEST_ERROR_CODE,
"input must not be empty".to_string(),
),
};
let error = JSONRPCErrorError {
code,
message,
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
}
}
fn build_review_turn(turn_id: String, display_text: &str) -> Turn {
let items = if display_text.is_empty() {
Vec::new()

View file

@ -62,6 +62,7 @@ use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadUnarchiveParams;
use codex_app_server_protocol::TurnInterruptParams;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnSteerParams;
use codex_core::default_client::CODEX_INTERNAL_ORIGINATOR_OVERRIDE_ENV_VAR;
use tokio::process::Command;
@ -557,6 +558,15 @@ impl McpProcess {
self.send_request("turn/interrupt", params).await
}
/// Send a `turn/steer` JSON-RPC request (v2).
pub async fn send_turn_steer_request(
&mut self,
params: TurnSteerParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("turn/steer", params).await
}
/// Send a `review/start` JSON-RPC request (v2).
pub async fn send_review_start_request(
&mut self,

View file

@ -26,3 +26,4 @@ mod thread_start;
mod thread_unarchive;
mod turn_interrupt;
mod turn_start;
mod turn_steer;

View file

@ -0,0 +1,179 @@
#![cfg(unix)]
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_mock_responses_server_sequence;
use app_test_support::create_mock_responses_server_sequence_unchecked;
use app_test_support::create_shell_command_sse_response;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnSteerParams;
use codex_app_server_protocol::TurnSteerResponse;
use codex_app_server_protocol::UserInput as V2UserInput;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
async fn turn_steer_requires_active_turn() -> Result<()> {
let tmp = TempDir::new()?;
let codex_home = tmp.path().join("codex_home");
std::fs::create_dir(&codex_home)?;
let server = create_mock_responses_server_sequence(vec![]).await;
create_config_toml(&codex_home, &server.uri())?;
let mut mcp = McpProcess::new(&codex_home).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
let steer_req = mcp
.send_turn_steer_request(TurnSteerParams {
thread_id: thread.id,
input: vec![V2UserInput::Text {
text: "steer".to_string(),
text_elements: Vec::new(),
}],
expected_turn_id: "turn-does-not-exist".to_string(),
})
.await?;
let steer_err: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(steer_req)),
)
.await??;
assert_eq!(steer_err.error.code, -32600);
Ok(())
}
#[tokio::test]
async fn turn_steer_returns_active_turn_id() -> Result<()> {
#[cfg(target_os = "windows")]
let shell_command = vec![
"powershell".to_string(),
"-Command".to_string(),
"Start-Sleep -Seconds 10".to_string(),
];
#[cfg(not(target_os = "windows"))]
let shell_command = vec!["sleep".to_string(), "10".to_string()];
let tmp = TempDir::new()?;
let codex_home = tmp.path().join("codex_home");
std::fs::create_dir(&codex_home)?;
let working_directory = tmp.path().join("workdir");
std::fs::create_dir(&working_directory)?;
let server =
create_mock_responses_server_sequence_unchecked(vec![create_shell_command_sse_response(
shell_command.clone(),
Some(&working_directory),
Some(10_000),
"call_sleep",
)?])
.await;
create_config_toml(&codex_home, &server.uri())?;
let mut mcp = McpProcess::new(&codex_home).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Text {
text: "run sleep".to_string(),
text_elements: Vec::new(),
}],
cwd: Some(working_directory.clone()),
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
let _task_started: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_started"),
)
.await??;
let steer_req = mcp
.send_turn_steer_request(TurnSteerParams {
thread_id: thread.id,
input: vec![V2UserInput::Text {
text: "steer".to_string(),
text_elements: Vec::new(),
}],
expected_turn_id: turn.id.clone(),
})
.await?;
let steer_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(steer_req)),
)
.await??;
let steer: TurnSteerResponse = to_response::<TurnSteerResponse>(steer_resp)?;
assert_eq!(steer.turn_id, turn.id);
Ok(())
}
fn create_config_toml(codex_home: &std::path::Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "danger-full-access"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
"#
),
)
}

View file

@ -118,6 +118,13 @@ use crate::error::CodexErr;
use crate::error::Result as CodexResult;
#[cfg(test)]
use crate::exec::StreamOutput;
#[derive(Debug, PartialEq)]
pub enum SteerInputError {
NoActiveTurn(Vec<UserInput>),
ExpectedTurnMismatch { expected: String, actual: String },
EmptyInput,
}
use crate::exec_policy::ExecPolicyUpdateError;
use crate::feedback_tags;
use crate::file_watcher::FileWatcher;
@ -455,6 +462,14 @@ impl Codex {
Ok(event)
}
pub async fn steer_input(
&self,
input: Vec<UserInput>,
expected_turn_id: Option<&str>,
) -> Result<String, SteerInputError> {
self.session.steer_input(input, expected_turn_id).await
}
pub(crate) async fn agent_status(&self) -> AgentStatus {
self.agent_status.borrow().clone()
}
@ -2327,17 +2342,39 @@ impl Session {
.await;
}
/// Returns the input if there was no task running to inject into
pub async fn inject_input(&self, input: Vec<UserInput>) -> Result<(), Vec<UserInput>> {
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
let mut ts = at.turn_state.lock().await;
ts.push_pending_input(input.into());
Ok(())
}
None => Err(input),
/// Inject additional user input into the currently active turn.
///
/// Returns the active turn id when accepted.
pub async fn steer_input(
&self,
input: Vec<UserInput>,
expected_turn_id: Option<&str>,
) -> Result<String, SteerInputError> {
if input.is_empty() {
return Err(SteerInputError::EmptyInput);
}
let mut active = self.active_turn.lock().await;
let Some(active_turn) = active.as_mut() else {
return Err(SteerInputError::NoActiveTurn(input));
};
let Some((active_turn_id, _)) = active_turn.tasks.first() else {
return Err(SteerInputError::NoActiveTurn(input));
};
if let Some(expected_turn_id) = expected_turn_id
&& expected_turn_id != active_turn_id
{
return Err(SteerInputError::ExpectedTurnMismatch {
expected: expected_turn_id.to_string(),
actual: active_turn_id.clone(),
});
}
let mut turn_state = active_turn.turn_state.lock().await;
turn_state.push_pending_input(input.into());
Ok(active_turn_id.clone())
}
/// Returns the input if there was no task running to inject into
@ -2716,6 +2753,7 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
mod handlers {
use crate::codex::Session;
use crate::codex::SessionSettingsUpdate;
use crate::codex::SteerInputError;
use crate::codex::TurnContext;
use crate::codex::spawn_review_thread;
@ -2850,8 +2888,8 @@ mod handlers {
};
current_context.otel_manager.user_prompt(&items);
// Attempt to inject input into current task
if let Err(items) = sess.inject_input(items).await {
// Attempt to inject input into current task.
if let Err(SteerInputError::NoActiveTurn(items)) = sess.steer_input(items, None).await {
sess.seed_initial_context_if_needed(&current_context).await;
let resumed_model = sess.take_pending_resume_previous_model().await;
let update_items = sess.build_settings_update_items(
@ -6123,6 +6161,89 @@ mod tests {
);
}
#[tokio::test]
async fn steer_input_requires_active_turn() {
let (sess, _tc, _rx) = make_session_and_context_with_rx().await;
let input = vec![UserInput::Text {
text: "steer".to_string(),
text_elements: Vec::new(),
}];
let err = sess
.steer_input(input, None)
.await
.expect_err("steering without active turn should fail");
assert!(matches!(err, SteerInputError::NoActiveTurn(_)));
}
#[tokio::test]
async fn steer_input_enforces_expected_turn_id() {
let (sess, tc, _rx) = make_session_and_context_with_rx().await;
let input = vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}];
sess.spawn_task(
Arc::clone(&tc),
input,
NeverEndingTask {
kind: TaskKind::Regular,
listen_to_cancellation_token: false,
},
)
.await;
let steer_input = vec![UserInput::Text {
text: "steer".to_string(),
text_elements: Vec::new(),
}];
let err = sess
.steer_input(steer_input, Some("different-turn-id"))
.await
.expect_err("mismatched expected turn id should fail");
match err {
SteerInputError::ExpectedTurnMismatch { expected, actual } => {
assert_eq!(
(expected, actual),
("different-turn-id".to_string(), tc.sub_id.clone())
);
}
other => panic!("unexpected error: {other:?}"),
}
}
#[tokio::test]
async fn steer_input_returns_active_turn_id() {
let (sess, tc, _rx) = make_session_and_context_with_rx().await;
let input = vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}];
sess.spawn_task(
Arc::clone(&tc),
input,
NeverEndingTask {
kind: TaskKind::Regular,
listen_to_cancellation_token: false,
},
)
.await;
let steer_input = vec![UserInput::Text {
text: "steer".to_string(),
text_elements: Vec::new(),
}];
let turn_id = sess
.steer_input(steer_input, Some(&tc.sub_id))
.await
.expect("steering with matching expected turn id should succeed");
assert_eq!(turn_id, tc.sub_id);
assert!(sess.has_pending_input().await);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn abort_review_task_emits_exited_then_aborted_and_records_history() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;

View file

@ -1,5 +1,6 @@
use crate::agent::AgentStatus;
use crate::codex::Codex;
use crate::codex::SteerInputError;
use crate::error::Result as CodexResult;
use crate::protocol::Event;
use crate::protocol::Op;
@ -9,6 +10,7 @@ use codex_protocol::openai_models::ReasoningEffort;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SessionSource;
use codex_protocol::user_input::UserInput;
use std::path::PathBuf;
use tokio::sync::watch;
@ -45,6 +47,14 @@ impl CodexThread {
self.codex.submit(op).await
}
pub async fn steer_input(
&self,
input: Vec<UserInput>,
expected_turn_id: Option<&str>,
) -> Result<String, SteerInputError> {
self.codex.steer_input(input, expected_turn_id).await
}
/// Use sparingly: this is intended to be removed soon.
pub async fn submit_with_id(&self, sub: Submission) -> CodexResult<()> {
self.codex.submit_with_id(sub).await

View file

@ -13,6 +13,7 @@ pub mod bash;
mod client;
mod client_common;
pub mod codex;
pub use codex::SteerInputError;
mod codex_thread;
mod compact_remote;
pub use codex_thread::CodexThread;