mirror of
https://github.com/nearai/ironclaw.git
synced 2026-06-09 03:24:59 +08:00
fix(mcp): three review findings on MCP registry / process / startup paths
1. McpProcessManager now partitions stdio children by (user_id,
server_name). Previously `transports` and `configs` were keyed by
`server_name` only, so a second user activating the same stdio MCP
server would overwrite the prior user's transport handle in the
map, leaving the prior child process orphaned. The Arc in the
prior user's `McpClient` kept the process alive for dispatch, but
`shutdown_all` / `try_restart` / `managed_servers` all lost
visibility of it. Added `McpProcessKey(user_id, server_name)` +
threaded `user_id` through spawn / shutdown / restart / get /
managed_servers, mirroring the `McpClientStore` partitioning from
d93243b7. Factory.rs and the single main.rs caller updated.
2. Startup MCP client injection in src/app.rs was passing the raw
config-row `server.name` (hyphens preserved) while the created
client and wrappers had already been normalized to underscores by
`create_client_from_config`. Result: the client landed in
McpClientStore under "my-mcp-server" while wrappers looked up
"my_mcp_server" at dispatch — every tool call failed with
"MCP server '…' is not active for this user" until manual
reactivation. Source the name from `client.server_name()` (the
already-normalized canonical field) so the insert key matches the
dispatch-time lookup key.
3. activate_mcp in src/extensions/manager.rs now performs the
tool-surface conflict check BEFORE persisting
`updated_server.cached_tools`. Previously the cache write happened
first; if the conflict check then rejected, the server's
persisted `cached_tools` still contained the new surface, and
`latent_provider_actions()` advertised tools from a backend that
couldn't actually be activated for this user.
This commit is contained in:
25
src/app.rs
25
src/app.rs
@@ -1014,11 +1014,32 @@ impl AppBuilder {
|
||||
"Injecting startup MCP clients into extension manager"
|
||||
);
|
||||
for (name, client) in startup_mcp_clients {
|
||||
// `name` here is the raw config row's `server.name`
|
||||
// captured before `create_client_from_config()`
|
||||
// normalized hyphens to underscores. The client
|
||||
// itself, the generated wrappers, and the session /
|
||||
// process managers all use the NORMALIZED name.
|
||||
// Using the raw `name` here would insert the client
|
||||
// into `McpClientStore` under `"my-mcp-server"`
|
||||
// while the wrappers look up `"my_mcp_server"` at
|
||||
// dispatch, silently failing every call with
|
||||
// "MCP server '…' is not active for this user"
|
||||
// until manual reactivation. Source the name from
|
||||
// the client's canonical field to guarantee the
|
||||
// insert key matches the dispatch-time lookup key.
|
||||
let normalized_name = client.server_name().to_string();
|
||||
let registered = manager
|
||||
.inject_mcp_client(name.clone(), &self.config.owner_id, client)
|
||||
.inject_mcp_client(normalized_name.clone(), &self.config.owner_id, client)
|
||||
.await;
|
||||
if name != normalized_name {
|
||||
tracing::debug!(
|
||||
raw_name = %name,
|
||||
normalized = %normalized_name,
|
||||
"Startup MCP server name normalized (hyphens -> underscores) for client-store injection"
|
||||
);
|
||||
}
|
||||
tracing::debug!(
|
||||
server = %name,
|
||||
server = %normalized_name,
|
||||
count = registered.len(),
|
||||
"Registered tools for startup MCP server"
|
||||
);
|
||||
|
||||
@@ -5574,12 +5574,6 @@ impl ExtensionManager {
|
||||
}
|
||||
})?;
|
||||
|
||||
let mut updated_server = server.clone();
|
||||
updated_server.cached_tools = mcp_tools.clone();
|
||||
self.update_mcp_server(updated_server, user_id)
|
||||
.await
|
||||
.map_err(|e| ExtensionError::ActivationFailed(e.to_string()))?;
|
||||
|
||||
// Before registering any tool wrappers for this user, fingerprint
|
||||
// the tool surface the server reported and reject activation if
|
||||
// another user already has the same `name` active with a
|
||||
@@ -5591,6 +5585,14 @@ impl ExtensionManager {
|
||||
// partitioning of the client store addressed the runtime
|
||||
// dispatch leak, but the registry surface was still global and
|
||||
// susceptible to the same cross-tenant leak.
|
||||
//
|
||||
// CRITICAL: this check must run BEFORE persisting
|
||||
// `cached_tools` on the server row. `latent_provider_actions()`
|
||||
// surfaces `server.cached_tools` for inactive MCP servers, so
|
||||
// writing them first and then rejecting would leave the
|
||||
// affected user seeing tool names and schemas from a backend
|
||||
// that cannot be activated while the other user owns the
|
||||
// shared server name.
|
||||
let surface_signature = crate::tools::mcp::surface_signature(&mcp_tools);
|
||||
if let Some(other_user) = self
|
||||
.mcp_clients
|
||||
@@ -5605,6 +5607,12 @@ impl ExtensionManager {
|
||||
)));
|
||||
}
|
||||
|
||||
let mut updated_server = server.clone();
|
||||
updated_server.cached_tools = mcp_tools.clone();
|
||||
self.update_mcp_server(updated_server, user_id)
|
||||
.await
|
||||
.map_err(|e| ExtensionError::ActivationFailed(e.to_string()))?;
|
||||
|
||||
// Store the client for this user first, then register the
|
||||
// (user-agnostic) tool wrappers. The wrappers resolve the caller's
|
||||
// client at dispatch time from the shared `McpClientStore`, so the
|
||||
|
||||
@@ -64,7 +64,13 @@ pub async fn create_client_from_config(
|
||||
match server.effective_transport() {
|
||||
EffectiveTransport::Stdio { command, args, env } => {
|
||||
let transport = process_manager
|
||||
.spawn_stdio(validated_name.as_str(), command, args.to_vec(), env.clone())
|
||||
.spawn_stdio(
|
||||
user_id,
|
||||
validated_name.as_str(),
|
||||
command,
|
||||
args.to_vec(),
|
||||
env.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| McpFactoryError::StdioSpawn {
|
||||
name: server_name.clone(),
|
||||
|
||||
@@ -21,12 +21,40 @@ pub struct StdioSpawnConfig {
|
||||
pub env: HashMap<String, String>,
|
||||
}
|
||||
|
||||
/// Composite key for a stdio MCP child process: the activating user
|
||||
/// plus the server name. Both fields participate in `Hash` / `Eq` so
|
||||
/// two users activating the same server name each get — and keep —
|
||||
/// their own child process instead of one silently overwriting the
|
||||
/// other's transport handle.
|
||||
///
|
||||
/// Stdio MCP servers receive credentials via their spawn `env` map, so
|
||||
/// sharing a single child across users would leak one tenant's
|
||||
/// credentials to the other's dispatches. Per-user children are
|
||||
/// required; the process manager must track them independently.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct McpProcessKey {
|
||||
pub user_id: String,
|
||||
pub server_name: String,
|
||||
}
|
||||
|
||||
impl McpProcessKey {
|
||||
pub fn new(user_id: &str, server_name: &str) -> Self {
|
||||
Self {
|
||||
user_id: user_id.to_string(),
|
||||
server_name: server_name.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Manages stdio MCP server processes.
|
||||
///
|
||||
/// Handles spawning, tracking, and shutdown of child processes.
|
||||
/// Handles spawning, tracking, and shutdown of child processes. Keyed
|
||||
/// by `(user_id, server_name)` so that multiple tenants activating
|
||||
/// the same server name end up with distinct, independently tracked
|
||||
/// child processes — see `McpProcessKey` for the rationale.
|
||||
pub struct McpProcessManager {
|
||||
transports: RwLock<HashMap<String, Arc<StdioMcpTransport>>>,
|
||||
configs: RwLock<HashMap<String, StdioSpawnConfig>>,
|
||||
transports: RwLock<HashMap<McpProcessKey, Arc<StdioMcpTransport>>>,
|
||||
configs: RwLock<HashMap<McpProcessKey, StdioSpawnConfig>>,
|
||||
}
|
||||
|
||||
impl McpProcessManager {
|
||||
@@ -37,9 +65,14 @@ impl McpProcessManager {
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn a new stdio MCP server process.
|
||||
/// Spawn a new stdio MCP server process for `(user_id,
|
||||
/// server_name)`. If an entry already exists for the same pair
|
||||
/// (same-user re-activation), the existing process is shut down
|
||||
/// first so the replacement doesn't leave an orphan. Other users'
|
||||
/// processes on the same `server_name` are untouched.
|
||||
pub async fn spawn_stdio(
|
||||
&self,
|
||||
user_id: &str,
|
||||
name: impl Into<String>,
|
||||
command: impl Into<String>,
|
||||
args: Vec<String>,
|
||||
@@ -47,10 +80,25 @@ impl McpProcessManager {
|
||||
) -> Result<Arc<StdioMcpTransport>, ToolError> {
|
||||
let name = name.into();
|
||||
let command = command.into();
|
||||
let key = McpProcessKey::new(user_id, &name);
|
||||
|
||||
// Same-user re-activation: shut the previous child down before
|
||||
// the new one takes its slot so the old process doesn't become
|
||||
// an orphan.
|
||||
if let Some(old_transport) = self.transports.write().await.remove(&key)
|
||||
&& let Err(e) = old_transport.shutdown().await
|
||||
{
|
||||
tracing::warn!(
|
||||
user_id = %user_id,
|
||||
server = %name,
|
||||
error = %e,
|
||||
"Failed to shut down previous stdio MCP child before replacement"
|
||||
);
|
||||
}
|
||||
|
||||
// Store config for potential restart
|
||||
self.configs.write().await.insert(
|
||||
name.clone(),
|
||||
key.clone(),
|
||||
StdioSpawnConfig {
|
||||
command: command.clone(),
|
||||
args: args.clone(),
|
||||
@@ -63,61 +111,77 @@ impl McpProcessManager {
|
||||
self.transports
|
||||
.write()
|
||||
.await
|
||||
.insert(name, Arc::clone(&transport));
|
||||
.insert(key, Arc::clone(&transport));
|
||||
|
||||
Ok(transport)
|
||||
}
|
||||
|
||||
/// Get a transport by server name.
|
||||
pub async fn get(&self, name: &str) -> Option<Arc<StdioMcpTransport>> {
|
||||
self.transports.read().await.get(name).cloned()
|
||||
/// Get a transport by `(user_id, server_name)`.
|
||||
pub async fn get(&self, user_id: &str, name: &str) -> Option<Arc<StdioMcpTransport>> {
|
||||
self.transports
|
||||
.read()
|
||||
.await
|
||||
.get(&McpProcessKey::new(user_id, name))
|
||||
.cloned()
|
||||
}
|
||||
|
||||
/// Shut down all managed transports.
|
||||
pub async fn shutdown_all(&self) {
|
||||
let transports: Vec<(String, Arc<StdioMcpTransport>)> = {
|
||||
let transports: Vec<(McpProcessKey, Arc<StdioMcpTransport>)> = {
|
||||
let mut map = self.transports.write().await;
|
||||
map.drain().collect()
|
||||
};
|
||||
|
||||
for (name, transport) in transports {
|
||||
for (key, transport) in transports {
|
||||
if let Err(e) = transport.shutdown().await {
|
||||
tracing::warn!("Failed to shut down MCP stdio server '{}': {}", name, e);
|
||||
tracing::warn!(
|
||||
user_id = %key.user_id,
|
||||
server = %key.server_name,
|
||||
error = %e,
|
||||
"Failed to shut down MCP stdio server",
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Shut down a specific transport by name.
|
||||
pub async fn shutdown(&self, name: &str) -> Result<(), ToolError> {
|
||||
let transport = self.transports.write().await.remove(name);
|
||||
/// Shut down the transport for `(user_id, server_name)`.
|
||||
pub async fn shutdown(&self, user_id: &str, name: &str) -> Result<(), ToolError> {
|
||||
let key = McpProcessKey::new(user_id, name);
|
||||
let transport = self.transports.write().await.remove(&key);
|
||||
|
||||
if let Some(transport) = transport {
|
||||
transport.shutdown().await?;
|
||||
}
|
||||
|
||||
self.configs.write().await.remove(name);
|
||||
self.configs.write().await.remove(&key);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Attempt to restart a crashed transport with exponential backoff.
|
||||
/// Attempt to restart a crashed transport for `(user_id,
|
||||
/// server_name)` with exponential backoff.
|
||||
///
|
||||
/// Tries up to 5 times with delays of 1s, 2s, 4s, 8s, 16s (total: 31s max wait).
|
||||
pub async fn try_restart(&self, name: &str) -> Result<Arc<StdioMcpTransport>, ToolError> {
|
||||
pub async fn try_restart(
|
||||
&self,
|
||||
user_id: &str,
|
||||
name: &str,
|
||||
) -> Result<Arc<StdioMcpTransport>, ToolError> {
|
||||
let key = McpProcessKey::new(user_id, name);
|
||||
let config = self
|
||||
.configs
|
||||
.read()
|
||||
.await
|
||||
.get(name)
|
||||
.get(&key)
|
||||
.cloned()
|
||||
.ok_or_else(|| {
|
||||
ToolError::ExternalService(format!(
|
||||
"No spawn config for MCP server '{}', cannot restart",
|
||||
name
|
||||
"No spawn config for MCP server '{}' (user {}), cannot restart",
|
||||
name, user_id
|
||||
))
|
||||
})?;
|
||||
|
||||
// Shut down and remove old transport to avoid orphaning a wedged process.
|
||||
if let Some(old_transport) = self.transports.write().await.remove(name) {
|
||||
if let Some(old_transport) = self.transports.write().await.remove(&key) {
|
||||
let _ = old_transport.shutdown().await;
|
||||
}
|
||||
|
||||
@@ -141,20 +205,22 @@ impl McpProcessManager {
|
||||
self.transports
|
||||
.write()
|
||||
.await
|
||||
.insert(name.to_string(), Arc::clone(&transport));
|
||||
.insert(key.clone(), Arc::clone(&transport));
|
||||
tracing::info!(
|
||||
"MCP stdio server '{}' restarted after {} attempt(s)",
|
||||
name,
|
||||
user_id = %user_id,
|
||||
server = %name,
|
||||
"MCP stdio server restarted after {} attempt(s)",
|
||||
attempt + 1
|
||||
);
|
||||
return Ok(transport);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
"Restart attempt {}/{} for MCP server '{}' failed: {}",
|
||||
user_id = %user_id,
|
||||
server = %name,
|
||||
"Restart attempt {}/{} failed: {}",
|
||||
attempt + 1,
|
||||
max_retries,
|
||||
name,
|
||||
e
|
||||
);
|
||||
last_err = Some(e);
|
||||
@@ -164,14 +230,14 @@ impl McpProcessManager {
|
||||
|
||||
Err(last_err.unwrap_or_else(|| {
|
||||
ToolError::ExternalService(format!(
|
||||
"Failed to restart MCP server '{}' after {} attempts",
|
||||
name, max_retries
|
||||
"Failed to restart MCP server '{}' (user {}) after {} attempts",
|
||||
name, user_id, max_retries
|
||||
))
|
||||
}))
|
||||
}
|
||||
|
||||
/// Get names of all managed transports.
|
||||
pub async fn managed_servers(&self) -> Vec<String> {
|
||||
/// Get `(user_id, server_name)` pairs of all managed transports.
|
||||
pub async fn managed_servers(&self) -> Vec<McpProcessKey> {
|
||||
self.transports.read().await.keys().cloned().collect()
|
||||
}
|
||||
}
|
||||
@@ -203,4 +269,16 @@ mod tests {
|
||||
let manager = McpProcessManager::new();
|
||||
manager.shutdown_all().await;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_process_key_partitions_by_user_and_server() {
|
||||
let k1 = McpProcessKey::new("user-a", "stdio_server");
|
||||
let k2 = McpProcessKey::new("user-b", "stdio_server");
|
||||
let k3 = McpProcessKey::new("user-a", "other_server");
|
||||
let k1_dup = McpProcessKey::new("user-a", "stdio_server");
|
||||
|
||||
assert_ne!(k1, k2, "different users on same server must not collide");
|
||||
assert_ne!(k1, k3, "same user on different servers must not collide");
|
||||
assert_eq!(k1, k1_dup, "same (user, server) must be equal");
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user