fix(mcp): three review findings on MCP registry / process / startup paths

1. McpProcessManager now partitions stdio children by (user_id,
   server_name). Previously `transports` and `configs` were keyed by
   `server_name` only, so a second user activating the same stdio MCP
   server would overwrite the prior user's transport handle in the
   map, leaving the prior child process orphaned. The Arc in the
   prior user's `McpClient` kept the process alive for dispatch, but
   `shutdown_all` / `try_restart` / `managed_servers` all lost
   visibility of it. Added `McpProcessKey(user_id, server_name)` +
   threaded `user_id` through spawn / shutdown / restart / get /
   managed_servers, mirroring the `McpClientStore` partitioning from
   d93243b7. Factory.rs and the single main.rs caller updated.

2. Startup MCP client injection in src/app.rs was passing the raw
   config-row `server.name` (hyphens preserved) while the created
   client and wrappers had already been normalized to underscores by
   `create_client_from_config`. Result: the client landed in
   McpClientStore under "my-mcp-server" while wrappers looked up
   "my_mcp_server" at dispatch — every tool call failed with
   "MCP server '…' is not active for this user" until manual
   reactivation. Source the name from `client.server_name()` (the
   already-normalized canonical field) so the insert key matches the
   dispatch-time lookup key.

3. activate_mcp in src/extensions/manager.rs now performs the
   tool-surface conflict check BEFORE persisting
   `updated_server.cached_tools`. Previously the cache write happened
   first; if the conflict check then rejected, the server's
   persisted `cached_tools` still contained the new surface, and
   `latent_provider_actions()` advertised tools from a backend that
   couldn't actually be activated for this user.
This commit is contained in:
Nikolay Pismenkov
2026-04-21 20:24:17 -07:00
parent 63b79fa411
commit 13b76380b9
4 changed files with 153 additions and 40 deletions

View File

@@ -1014,11 +1014,32 @@ impl AppBuilder {
"Injecting startup MCP clients into extension manager"
);
for (name, client) in startup_mcp_clients {
// `name` here is the raw config row's `server.name`
// captured before `create_client_from_config()`
// normalized hyphens to underscores. The client
// itself, the generated wrappers, and the session /
// process managers all use the NORMALIZED name.
// Using the raw `name` here would insert the client
// into `McpClientStore` under `"my-mcp-server"`
// while the wrappers look up `"my_mcp_server"` at
// dispatch, silently failing every call with
// "MCP server '…' is not active for this user"
// until manual reactivation. Source the name from
// the client's canonical field to guarantee the
// insert key matches the dispatch-time lookup key.
let normalized_name = client.server_name().to_string();
let registered = manager
.inject_mcp_client(name.clone(), &self.config.owner_id, client)
.inject_mcp_client(normalized_name.clone(), &self.config.owner_id, client)
.await;
if name != normalized_name {
tracing::debug!(
raw_name = %name,
normalized = %normalized_name,
"Startup MCP server name normalized (hyphens -> underscores) for client-store injection"
);
}
tracing::debug!(
server = %name,
server = %normalized_name,
count = registered.len(),
"Registered tools for startup MCP server"
);

View File

@@ -5574,12 +5574,6 @@ impl ExtensionManager {
}
})?;
let mut updated_server = server.clone();
updated_server.cached_tools = mcp_tools.clone();
self.update_mcp_server(updated_server, user_id)
.await
.map_err(|e| ExtensionError::ActivationFailed(e.to_string()))?;
// Before registering any tool wrappers for this user, fingerprint
// the tool surface the server reported and reject activation if
// another user already has the same `name` active with a
@@ -5591,6 +5585,14 @@ impl ExtensionManager {
// partitioning of the client store addressed the runtime
// dispatch leak, but the registry surface was still global and
// susceptible to the same cross-tenant leak.
//
// CRITICAL: this check must run BEFORE persisting
// `cached_tools` on the server row. `latent_provider_actions()`
// surfaces `server.cached_tools` for inactive MCP servers, so
// writing them first and then rejecting would leave the
// affected user seeing tool names and schemas from a backend
// that cannot be activated while the other user owns the
// shared server name.
let surface_signature = crate::tools::mcp::surface_signature(&mcp_tools);
if let Some(other_user) = self
.mcp_clients
@@ -5605,6 +5607,12 @@ impl ExtensionManager {
)));
}
let mut updated_server = server.clone();
updated_server.cached_tools = mcp_tools.clone();
self.update_mcp_server(updated_server, user_id)
.await
.map_err(|e| ExtensionError::ActivationFailed(e.to_string()))?;
// Store the client for this user first, then register the
// (user-agnostic) tool wrappers. The wrappers resolve the caller's
// client at dispatch time from the shared `McpClientStore`, so the

View File

@@ -64,7 +64,13 @@ pub async fn create_client_from_config(
match server.effective_transport() {
EffectiveTransport::Stdio { command, args, env } => {
let transport = process_manager
.spawn_stdio(validated_name.as_str(), command, args.to_vec(), env.clone())
.spawn_stdio(
user_id,
validated_name.as_str(),
command,
args.to_vec(),
env.clone(),
)
.await
.map_err(|e| McpFactoryError::StdioSpawn {
name: server_name.clone(),

View File

@@ -21,12 +21,40 @@ pub struct StdioSpawnConfig {
pub env: HashMap<String, String>,
}
/// Composite key for a stdio MCP child process: the activating user
/// plus the server name. Both fields participate in `Hash` / `Eq` so
/// two users activating the same server name each get — and keep —
/// their own child process instead of one silently overwriting the
/// other's transport handle.
///
/// Stdio MCP servers receive credentials via their spawn `env` map, so
/// sharing a single child across users would leak one tenant's
/// credentials to the other's dispatches. Per-user children are
/// required; the process manager must track them independently.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct McpProcessKey {
pub user_id: String,
pub server_name: String,
}
impl McpProcessKey {
pub fn new(user_id: &str, server_name: &str) -> Self {
Self {
user_id: user_id.to_string(),
server_name: server_name.to_string(),
}
}
}
/// Manages stdio MCP server processes.
///
/// Handles spawning, tracking, and shutdown of child processes.
/// Handles spawning, tracking, and shutdown of child processes. Keyed
/// by `(user_id, server_name)` so that multiple tenants activating
/// the same server name end up with distinct, independently tracked
/// child processes — see `McpProcessKey` for the rationale.
pub struct McpProcessManager {
transports: RwLock<HashMap<String, Arc<StdioMcpTransport>>>,
configs: RwLock<HashMap<String, StdioSpawnConfig>>,
transports: RwLock<HashMap<McpProcessKey, Arc<StdioMcpTransport>>>,
configs: RwLock<HashMap<McpProcessKey, StdioSpawnConfig>>,
}
impl McpProcessManager {
@@ -37,9 +65,14 @@ impl McpProcessManager {
}
}
/// Spawn a new stdio MCP server process.
/// Spawn a new stdio MCP server process for `(user_id,
/// server_name)`. If an entry already exists for the same pair
/// (same-user re-activation), the existing process is shut down
/// first so the replacement doesn't leave an orphan. Other users'
/// processes on the same `server_name` are untouched.
pub async fn spawn_stdio(
&self,
user_id: &str,
name: impl Into<String>,
command: impl Into<String>,
args: Vec<String>,
@@ -47,10 +80,25 @@ impl McpProcessManager {
) -> Result<Arc<StdioMcpTransport>, ToolError> {
let name = name.into();
let command = command.into();
let key = McpProcessKey::new(user_id, &name);
// Same-user re-activation: shut the previous child down before
// the new one takes its slot so the old process doesn't become
// an orphan.
if let Some(old_transport) = self.transports.write().await.remove(&key)
&& let Err(e) = old_transport.shutdown().await
{
tracing::warn!(
user_id = %user_id,
server = %name,
error = %e,
"Failed to shut down previous stdio MCP child before replacement"
);
}
// Store config for potential restart
self.configs.write().await.insert(
name.clone(),
key.clone(),
StdioSpawnConfig {
command: command.clone(),
args: args.clone(),
@@ -63,61 +111,77 @@ impl McpProcessManager {
self.transports
.write()
.await
.insert(name, Arc::clone(&transport));
.insert(key, Arc::clone(&transport));
Ok(transport)
}
/// Get a transport by server name.
pub async fn get(&self, name: &str) -> Option<Arc<StdioMcpTransport>> {
self.transports.read().await.get(name).cloned()
/// Get a transport by `(user_id, server_name)`.
pub async fn get(&self, user_id: &str, name: &str) -> Option<Arc<StdioMcpTransport>> {
self.transports
.read()
.await
.get(&McpProcessKey::new(user_id, name))
.cloned()
}
/// Shut down all managed transports.
pub async fn shutdown_all(&self) {
let transports: Vec<(String, Arc<StdioMcpTransport>)> = {
let transports: Vec<(McpProcessKey, Arc<StdioMcpTransport>)> = {
let mut map = self.transports.write().await;
map.drain().collect()
};
for (name, transport) in transports {
for (key, transport) in transports {
if let Err(e) = transport.shutdown().await {
tracing::warn!("Failed to shut down MCP stdio server '{}': {}", name, e);
tracing::warn!(
user_id = %key.user_id,
server = %key.server_name,
error = %e,
"Failed to shut down MCP stdio server",
);
}
}
}
/// Shut down a specific transport by name.
pub async fn shutdown(&self, name: &str) -> Result<(), ToolError> {
let transport = self.transports.write().await.remove(name);
/// Shut down the transport for `(user_id, server_name)`.
pub async fn shutdown(&self, user_id: &str, name: &str) -> Result<(), ToolError> {
let key = McpProcessKey::new(user_id, name);
let transport = self.transports.write().await.remove(&key);
if let Some(transport) = transport {
transport.shutdown().await?;
}
self.configs.write().await.remove(name);
self.configs.write().await.remove(&key);
Ok(())
}
/// Attempt to restart a crashed transport with exponential backoff.
/// Attempt to restart a crashed transport for `(user_id,
/// server_name)` with exponential backoff.
///
/// Tries up to 5 times with delays of 1s, 2s, 4s, 8s, 16s (total: 31s max wait).
pub async fn try_restart(&self, name: &str) -> Result<Arc<StdioMcpTransport>, ToolError> {
pub async fn try_restart(
&self,
user_id: &str,
name: &str,
) -> Result<Arc<StdioMcpTransport>, ToolError> {
let key = McpProcessKey::new(user_id, name);
let config = self
.configs
.read()
.await
.get(name)
.get(&key)
.cloned()
.ok_or_else(|| {
ToolError::ExternalService(format!(
"No spawn config for MCP server '{}', cannot restart",
name
"No spawn config for MCP server '{}' (user {}), cannot restart",
name, user_id
))
})?;
// Shut down and remove old transport to avoid orphaning a wedged process.
if let Some(old_transport) = self.transports.write().await.remove(name) {
if let Some(old_transport) = self.transports.write().await.remove(&key) {
let _ = old_transport.shutdown().await;
}
@@ -141,20 +205,22 @@ impl McpProcessManager {
self.transports
.write()
.await
.insert(name.to_string(), Arc::clone(&transport));
.insert(key.clone(), Arc::clone(&transport));
tracing::info!(
"MCP stdio server '{}' restarted after {} attempt(s)",
name,
user_id = %user_id,
server = %name,
"MCP stdio server restarted after {} attempt(s)",
attempt + 1
);
return Ok(transport);
}
Err(e) => {
tracing::warn!(
"Restart attempt {}/{} for MCP server '{}' failed: {}",
user_id = %user_id,
server = %name,
"Restart attempt {}/{} failed: {}",
attempt + 1,
max_retries,
name,
e
);
last_err = Some(e);
@@ -164,14 +230,14 @@ impl McpProcessManager {
Err(last_err.unwrap_or_else(|| {
ToolError::ExternalService(format!(
"Failed to restart MCP server '{}' after {} attempts",
name, max_retries
"Failed to restart MCP server '{}' (user {}) after {} attempts",
name, user_id, max_retries
))
}))
}
/// Get names of all managed transports.
pub async fn managed_servers(&self) -> Vec<String> {
/// Get `(user_id, server_name)` pairs of all managed transports.
pub async fn managed_servers(&self) -> Vec<McpProcessKey> {
self.transports.read().await.keys().cloned().collect()
}
}
@@ -203,4 +269,16 @@ mod tests {
let manager = McpProcessManager::new();
manager.shutdown_all().await;
}
#[test]
fn test_process_key_partitions_by_user_and_server() {
let k1 = McpProcessKey::new("user-a", "stdio_server");
let k2 = McpProcessKey::new("user-b", "stdio_server");
let k3 = McpProcessKey::new("user-a", "other_server");
let k1_dup = McpProcessKey::new("user-a", "stdio_server");
assert_ne!(k1, k2, "different users on same server must not collide");
assert_ne!(k1, k3, "same user on different servers must not collide");
assert_eq!(k1, k1_dup, "same (user, server) must be equal");
}
}