[apps] Add thread_id param to optionally load thread config for apps feature check. (#11279)
- [x] Add thread_id param to optionally load thread config for apps feature check
This commit is contained in:
parent
503186b31f
commit
005e040f97
11 changed files with 186 additions and 1 deletions
|
|
@ -42,6 +42,13 @@
|
|||
"integer",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"threadId": {
|
||||
"description": "Optional thread id used to evaluate app feature gating from that thread's config.",
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
}
|
||||
},
|
||||
"type": "object"
|
||||
|
|
|
|||
|
|
@ -10083,6 +10083,13 @@
|
|||
"integer",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"threadId": {
|
||||
"description": "Optional thread id used to evaluate app feature gating from that thread's config.",
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
}
|
||||
},
|
||||
"title": "AppsListParams",
|
||||
|
|
|
|||
|
|
@ -21,6 +21,13 @@
|
|||
"integer",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"threadId": {
|
||||
"description": "Optional thread id used to evaluate app feature gating from that thread's config.",
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
}
|
||||
},
|
||||
"title": "AppsListParams",
|
||||
|
|
|
|||
|
|
@ -14,6 +14,10 @@ cursor?: string | null,
|
|||
* Optional page size; defaults to a reasonable server-side value.
|
||||
*/
|
||||
limit?: number | null,
|
||||
/**
|
||||
* Optional thread id used to evaluate app feature gating from that thread's config.
|
||||
*/
|
||||
threadId?: string | null,
|
||||
/**
|
||||
* When true, bypass app caches and fetch the latest data from sources.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -1107,6 +1107,27 @@ mod tests {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serialize_list_apps() -> Result<()> {
|
||||
let request = ClientRequest::AppsList {
|
||||
request_id: RequestId::Integer(8),
|
||||
params: v2::AppsListParams::default(),
|
||||
};
|
||||
assert_eq!(
|
||||
json!({
|
||||
"method": "app/list",
|
||||
"id": 8,
|
||||
"params": {
|
||||
"cursor": null,
|
||||
"limit": null,
|
||||
"threadId": null
|
||||
}
|
||||
}),
|
||||
serde_json::to_value(&request)?,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serialize_list_experimental_features() -> Result<()> {
|
||||
let request = ClientRequest::ExperimentalFeatureList {
|
||||
|
|
|
|||
|
|
@ -1201,6 +1201,9 @@ pub struct AppsListParams {
|
|||
/// Optional page size; defaults to a reasonable server-side value.
|
||||
#[ts(optional = nullable)]
|
||||
pub limit: Option<u32>,
|
||||
/// Optional thread id used to evaluate app feature gating from that thread's config.
|
||||
#[ts(optional = nullable)]
|
||||
pub thread_id: Option<String>,
|
||||
/// When true, bypass app caches and fetch the latest data from sources.
|
||||
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
|
||||
pub force_refetch: bool,
|
||||
|
|
|
|||
|
|
@ -722,6 +722,7 @@ Use `app/list` to fetch available apps (connectors). Each entry includes metadat
|
|||
{ "method": "app/list", "id": 50, "params": {
|
||||
"cursor": null,
|
||||
"limit": 50,
|
||||
"threadId": "thr_123",
|
||||
"forceRefetch": false
|
||||
} }
|
||||
{ "id": 50, "result": {
|
||||
|
|
@ -741,6 +742,8 @@ Use `app/list` to fetch available apps (connectors). Each entry includes metadat
|
|||
} }
|
||||
```
|
||||
|
||||
When `threadId` is provided, app feature gating (`Feature::Apps`) is evaluated using that thread's config snapshot. When omitted, the latest global config is used.
|
||||
|
||||
`app/list` returns after both accessible apps and directory apps are loaded. Set `forceRefetch: true` to bypass app caches and fetch fresh data from sources. Cache entries are only replaced when those refetches succeed.
|
||||
|
||||
The server also emits `app/list/updated` notifications whenever either source (accessible apps or directory apps) finishes loading. Each notification includes the latest merged app list.
|
||||
|
|
|
|||
|
|
@ -4368,7 +4368,7 @@ impl CodexMessageProcessor {
|
|||
}
|
||||
|
||||
async fn apps_list(&self, request_id: ConnectionRequestId, params: AppsListParams) {
|
||||
let config = match self.load_latest_config().await {
|
||||
let mut config = match self.load_latest_config().await {
|
||||
Ok(config) => config,
|
||||
Err(error) => {
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
|
|
@ -4376,6 +4376,22 @@ impl CodexMessageProcessor {
|
|||
}
|
||||
};
|
||||
|
||||
if let Some(thread_id) = params.thread_id.as_deref() {
|
||||
let (_, thread) = match self.load_thread(thread_id).await {
|
||||
Ok(result) => result,
|
||||
Err(error) => {
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if thread.enabled(Feature::Apps) {
|
||||
config.features.enable(Feature::Apps);
|
||||
} else {
|
||||
config.features.disable(Feature::Apps);
|
||||
}
|
||||
}
|
||||
|
||||
if !config.features.enabled(Feature::Apps) {
|
||||
self.outgoing
|
||||
.send_response(
|
||||
|
|
@ -4405,6 +4421,7 @@ impl CodexMessageProcessor {
|
|||
let AppsListParams {
|
||||
cursor,
|
||||
limit,
|
||||
thread_id: _,
|
||||
force_refetch,
|
||||
} = params;
|
||||
let start = match cursor {
|
||||
|
|
|
|||
|
|
@ -23,6 +23,8 @@ use codex_app_server_protocol::JSONRPCError;
|
|||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_core::auth::AuthCredentialsStoreMode;
|
||||
use pretty_assertions::assert_eq;
|
||||
use rmcp::handler::server::ServerHandler;
|
||||
|
|
@ -55,6 +57,7 @@ async fn list_apps_returns_empty_when_connectors_disabled() -> Result<()> {
|
|||
.send_apps_list_request(AppsListParams {
|
||||
limit: Some(50),
|
||||
cursor: None,
|
||||
thread_id: None,
|
||||
force_refetch: false,
|
||||
})
|
||||
.await?;
|
||||
|
|
@ -72,6 +75,103 @@ async fn list_apps_returns_empty_when_connectors_disabled() -> Result<()> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_apps_uses_thread_feature_flag_when_thread_id_is_provided() -> Result<()> {
|
||||
let connectors = vec![AppInfo {
|
||||
id: "beta".to_string(),
|
||||
name: "Beta".to_string(),
|
||||
description: Some("Beta connector".to_string()),
|
||||
logo_url: None,
|
||||
logo_url_dark: None,
|
||||
distribution_channel: None,
|
||||
install_url: None,
|
||||
is_accessible: false,
|
||||
}];
|
||||
let tools = vec![connector_tool("beta", "Beta App")?];
|
||||
let (server_url, server_handle) =
|
||||
start_apps_server_with_delays(connectors, tools, Duration::ZERO, Duration::ZERO).await?;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
write_connectors_config(codex_home.path(), &server_url)?;
|
||||
write_chatgpt_auth(
|
||||
codex_home.path(),
|
||||
ChatGptAuthFixture::new("chatgpt-token")
|
||||
.account_id("account-123")
|
||||
.chatgpt_user_id("user-123")
|
||||
.chatgpt_account_id("account-123"),
|
||||
AuthCredentialsStoreMode::File,
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let start_request = mcp
|
||||
.send_thread_start_request(ThreadStartParams::default())
|
||||
.await?;
|
||||
let start_response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(start_request)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response(start_response)?;
|
||||
|
||||
std::fs::write(
|
||||
codex_home.path().join("config.toml"),
|
||||
format!(
|
||||
r#"
|
||||
chatgpt_base_url = "{server_url}"
|
||||
|
||||
[features]
|
||||
connectors = false
|
||||
"#
|
||||
),
|
||||
)?;
|
||||
|
||||
let global_request = mcp
|
||||
.send_apps_list_request(AppsListParams {
|
||||
limit: None,
|
||||
cursor: None,
|
||||
thread_id: None,
|
||||
force_refetch: false,
|
||||
})
|
||||
.await?;
|
||||
let global_response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(global_request)),
|
||||
)
|
||||
.await??;
|
||||
let AppsListResponse {
|
||||
data: global_data,
|
||||
next_cursor: global_next_cursor,
|
||||
} = to_response(global_response)?;
|
||||
assert!(global_data.is_empty());
|
||||
assert!(global_next_cursor.is_none());
|
||||
|
||||
let thread_request = mcp
|
||||
.send_apps_list_request(AppsListParams {
|
||||
limit: None,
|
||||
cursor: None,
|
||||
thread_id: Some(thread.id),
|
||||
force_refetch: false,
|
||||
})
|
||||
.await?;
|
||||
let thread_response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(thread_request)),
|
||||
)
|
||||
.await??;
|
||||
let AppsListResponse {
|
||||
data: thread_data,
|
||||
next_cursor: thread_next_cursor,
|
||||
} = to_response(thread_response)?;
|
||||
assert!(thread_data.iter().any(|app| app.id == "beta"));
|
||||
assert!(thread_next_cursor.is_none());
|
||||
|
||||
server_handle.abort();
|
||||
let _ = server_handle.await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_apps_emits_updates_and_returns_after_both_lists_load() -> Result<()> {
|
||||
let connectors = vec![
|
||||
|
|
@ -124,6 +224,7 @@ async fn list_apps_emits_updates_and_returns_after_both_lists_load() -> Result<(
|
|||
.send_apps_list_request(AppsListParams {
|
||||
limit: None,
|
||||
cursor: None,
|
||||
thread_id: None,
|
||||
force_refetch: false,
|
||||
})
|
||||
.await?;
|
||||
|
|
@ -238,6 +339,7 @@ async fn list_apps_returns_connectors_with_accessible_flags() -> Result<()> {
|
|||
.send_apps_list_request(AppsListParams {
|
||||
limit: None,
|
||||
cursor: None,
|
||||
thread_id: None,
|
||||
force_refetch: false,
|
||||
})
|
||||
.await?;
|
||||
|
|
@ -360,6 +462,7 @@ async fn list_apps_paginates_results() -> Result<()> {
|
|||
.send_apps_list_request(AppsListParams {
|
||||
limit: Some(1),
|
||||
cursor: None,
|
||||
thread_id: None,
|
||||
force_refetch: false,
|
||||
})
|
||||
.await?;
|
||||
|
|
@ -398,6 +501,7 @@ async fn list_apps_paginates_results() -> Result<()> {
|
|||
.send_apps_list_request(AppsListParams {
|
||||
limit: Some(1),
|
||||
cursor: Some(next_cursor),
|
||||
thread_id: None,
|
||||
force_refetch: false,
|
||||
})
|
||||
.await?;
|
||||
|
|
@ -463,6 +567,7 @@ async fn list_apps_force_refetch_preserves_previous_cache_on_failure() -> Result
|
|||
.send_apps_list_request(AppsListParams {
|
||||
limit: None,
|
||||
cursor: None,
|
||||
thread_id: None,
|
||||
force_refetch: false,
|
||||
})
|
||||
.await?;
|
||||
|
|
@ -492,6 +597,7 @@ async fn list_apps_force_refetch_preserves_previous_cache_on_failure() -> Result
|
|||
.send_apps_list_request(AppsListParams {
|
||||
limit: None,
|
||||
cursor: None,
|
||||
thread_id: None,
|
||||
force_refetch: true,
|
||||
})
|
||||
.await?;
|
||||
|
|
@ -506,6 +612,7 @@ async fn list_apps_force_refetch_preserves_previous_cache_on_failure() -> Result
|
|||
.send_apps_list_request(AppsListParams {
|
||||
limit: None,
|
||||
cursor: None,
|
||||
thread_id: None,
|
||||
force_refetch: false,
|
||||
})
|
||||
.await?;
|
||||
|
|
|
|||
|
|
@ -488,6 +488,10 @@ impl Codex {
|
|||
pub(crate) fn state_db(&self) -> Option<state_db::StateDbHandle> {
|
||||
self.session.state_db()
|
||||
}
|
||||
|
||||
pub(crate) fn enabled(&self, feature: Feature) -> bool {
|
||||
self.session.enabled(feature)
|
||||
}
|
||||
}
|
||||
|
||||
/// Context for an initialized model agent
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ use crate::agent::AgentStatus;
|
|||
use crate::codex::Codex;
|
||||
use crate::codex::SteerInputError;
|
||||
use crate::error::Result as CodexResult;
|
||||
use crate::features::Feature;
|
||||
use crate::protocol::Event;
|
||||
use crate::protocol::Op;
|
||||
use crate::protocol::Submission;
|
||||
|
|
@ -83,4 +84,8 @@ impl CodexThread {
|
|||
pub async fn config_snapshot(&self) -> ThreadConfigSnapshot {
|
||||
self.codex.thread_config_snapshot().await
|
||||
}
|
||||
|
||||
pub fn enabled(&self, feature: Feature) -> bool {
|
||||
self.codex.enabled(feature)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue