tech spec

This commit is contained in:
Yunfan Yang
2026-04-27 21:13:38 -05:00
parent 823458bb7e
commit 104aafb8cf
3 changed files with 250 additions and 74 deletions

View File

@@ -36,6 +36,9 @@ use crate::server::telemetry::{BootstrappingInfo, TelemetryEvent};
use crate::terminal::event::ExecutedExecutorCommandEvent;
use crate::terminal::ShellHost;
use crate::terminal::ShellLaunchData;
#[cfg(feature = "local_tty")]
use command_executor::remote_server_executor::RemoteServerCommandExecutor;
use parking_lot::{Mutex, RwLock};
use crate::terminal::shell::{Shell, ShellType};
use crate::terminal::warpify::SubshellSource;
@@ -170,9 +173,20 @@ impl Sessions {
| RemoteServerManagerEvent::SetupStateChanged { .. }
| RemoteServerManagerEvent::BinaryCheckComplete { .. }
| RemoteServerManagerEvent::BinaryInstallComplete { .. }
| RemoteServerManagerEvent::SessionReconnected { .. }
| RemoteServerManagerEvent::ClientRequestFailed { .. }
| RemoteServerManagerEvent::ServerMessageDecodingError { .. } => {}
RemoteServerManagerEvent::SessionReconnected {
session_id: sid,
client,
..
} => {
if let Some(session) = sessions.sessions.get(sid) {
let new_executor =
Arc::new(RemoteServerCommandExecutor::new(*sid, client.clone()));
session.set_command_executor(new_executor);
log::info!("Swapped command executor for session {sid:?} after reconnect");
}
}
});
}
#[cfg(not(feature = "local_tty"))]
@@ -848,14 +862,16 @@ impl From<BootstrapSessionType> for SessionType {
pub struct Session {
info: SessionInfo,
external_commands: Arc<OnceCell<HashSet<SmolStr>>>,
command_executor: Arc<dyn CommandExecutor>,
/// The command executor for this session. Behind a `RwLock` so it can be
/// swapped after a remote server reconnect (via `set_command_executor`).
command_executor: RwLock<Arc<dyn CommandExecutor>>,
load_external_commands_future: OnceCell<Shared<BoxFuture<'static, ()>>>,
command_case_sensitivity: TopLevelCommandCaseSensitivity,
/// The authoritative session type, initially derived from the
/// [`BootstrapSessionType`] in `SessionInfo` and updated by [`Sessions`]
/// when `RemoteServerManager` reports a connected session (to fill in the
/// `host_id`). Interior mutability allows updating through `Arc<Session>`.
session_type: parking_lot::Mutex<SessionType>,
session_type: Mutex<SessionType>,
}
impl Session {
@@ -874,10 +890,10 @@ impl Session {
Self {
info: session_info,
external_commands: Arc::new(OnceCell::new()),
command_executor,
command_executor: RwLock::new(command_executor),
load_external_commands_future: Default::default(),
command_case_sensitivity,
session_type: parking_lot::Mutex::new(session_type),
session_type: Mutex::new(session_type),
}
}
@@ -1026,9 +1042,17 @@ impl Session {
&self.info.subshell_info
}
/// Replaces the command executor for this session. Used after a remote
/// server reconnect to swap in a new `RemoteServerCommandExecutor`
/// backed by the reconnected client.
pub fn set_command_executor(&self, executor: Arc<dyn CommandExecutor>) {
*self.command_executor.write() = executor;
}
/// Returns true if the session is employing in-band command execution to run generators.
pub fn is_using_in_band_command_execution(&self) -> bool {
self.command_executor
.read()
.as_ref()
.as_any()
.downcast_ref::<InBandCommandExecutor>()
@@ -1090,8 +1114,8 @@ impl Session {
.path
.as_deref()
.map(|path| HashMap::from_iter([("PATH".to_string(), path.to_string())]));
let windows_results = self
.command_executor
let executor = self.command_executor.read().clone();
let windows_results = executor
.execute_command(
ShellType::PowerShell.shell_command_to_get_executables(),
&Shell::new(ShellType::PowerShell, None, None, Default::default(), None),
@@ -1382,7 +1406,10 @@ impl Session {
environment_variables: Option<HashMap<String, String>>,
execute_command_options: ExecuteCommandOptions,
) -> Result<CommandOutput> {
self.command_executor
// Clone the Arc out of the lock so we don't hold the read guard
// across the await point.
let executor = self.command_executor.read().clone();
executor
.execute_command(
command,
&self.info.shell,
@@ -1395,11 +1422,13 @@ impl Session {
/// Whether the backing executor for the session supports execution of commands in parallel.
pub fn supports_parallel_command_execution(&self) -> bool {
self.command_executor.supports_parallel_command_execution()
self.command_executor
.read()
.supports_parallel_command_execution()
}
pub fn cancel_active_commands(&self) {
self.command_executor.cancel_active_commands();
self.command_executor.read().cancel_active_commands();
}
pub async fn git_branches_for_command_corrections(&self, working_dir: &str) -> Vec<String> {
@@ -1645,10 +1674,10 @@ pub mod testing {
Self {
info,
external_commands: Default::default(),
command_executor: Arc::new(TestCommandExecutor::default()),
command_executor: RwLock::new(Arc::new(TestCommandExecutor::default())),
load_external_commands_future: Default::default(),
command_case_sensitivity: TopLevelCommandCaseSensitivity::CaseSensitive,
session_type: parking_lot::Mutex::new(session_type),
session_type: Mutex::new(session_type),
}
}
@@ -1660,10 +1689,10 @@ pub mod testing {
Self {
info,
external_commands: Default::default(),
command_executor: Arc::new(TestCommandExecutor::default()),
command_executor: RwLock::new(Arc::new(TestCommandExecutor::default())),
load_external_commands_future: Default::default(),
command_case_sensitivity: TopLevelCommandCaseSensitivity::CaseSensitive,
session_type: parking_lot::Mutex::new(session_type),
session_type: Mutex::new(session_type),
}
}

View File

@@ -23,6 +23,16 @@ const MAX_RECONNECT_ATTEMPTS: u32 = 2;
/// Delay between reconnection attempts.
const RECONNECT_DELAY: Duration = Duration::from_secs(2);
/// Parameters that travel together through the reconnection flow.
#[cfg(not(target_family = "wasm"))]
struct ReconnectParams {
attempt: u32,
host_id: HostId,
exit_status: Option<RemoteServerExitStatus>,
transport: Arc<dyn RemoteTransport>,
control_path: Option<PathBuf>,
}
/// Which phase of the remote server connection flow failed.
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
@@ -263,11 +273,13 @@ pub enum RemoteServerManagerEvent {
ServerMessageDecodingError { session_id: SessionId },
}
/// Shell info stashed by [`RemoteServerManager::notify_session_bootstrapped`]
/// when the session is not yet in `Connected` state. Flushed automatically
/// when [`RemoteServerManager::mark_session_connected`] fires.
/// Shell info recorded by [`RemoteServerManager::notify_session_bootstrapped`].
///
/// Persists for the lifetime of the session (removed only in
/// `deregister_session`) so that `mark_session_connected` can re-send
/// the notification after a reconnect.
#[cfg_attr(target_family = "wasm", allow(dead_code))]
struct PendingSessionBootstrappedNotification {
struct SessionBootstrapInfo {
shell_type: String,
shell_path: Option<String>,
}
@@ -292,9 +304,10 @@ pub struct RemoteServerManager {
/// `navigate_to_directory` calls when `update_active_session` fires
/// repeatedly for the same CWD.
last_navigated_path: HashMap<SessionId, String>,
/// Per-session `SessionBootstrapped` notifications that arrived before the
/// session reached `Connected`. Flushed in `mark_session_connected`.
pending_bootstrapped_notifications: HashMap<SessionId, PendingSessionBootstrappedNotification>,
/// Per-session shell info recorded at bootstrap time and re-sent to the
/// remote server daemon on every (re)connect. Persists until
/// `deregister_session`.
session_bootstrap_info: HashMap<SessionId, SessionBootstrapInfo>,
/// Detected remote platform per session, populated during the binary check
/// phase via `detect_platform()`. Used for telemetry.
session_platforms: HashMap<SessionId, RemotePlatform>,
@@ -313,7 +326,7 @@ impl RemoteServerManager {
host_to_sessions: HashMap::new(),
spawner: ctx.spawner(),
last_navigated_path: HashMap::new(),
pending_bootstrapped_notifications: HashMap::new(),
session_bootstrap_info: HashMap::new(),
session_platforms: HashMap::new(),
}
}
@@ -468,12 +481,15 @@ impl RemoteServerManager {
let spawner = self.spawner.clone();
let executor = ctx.background_executor().clone();
// Wrap the transport in an Arc so it can be stored on `Connected`
// for reconnection after a spontaneous disconnect.
let transport: Arc<dyn RemoteTransport> = Arc::new(transport);
ctx.background_executor()
.spawn(async move {
match Self::run_connect_and_handshake(
session_id,
&transport,
&*transport,
&spawner,
&executor,
)
@@ -482,7 +498,12 @@ impl RemoteServerManager {
Ok(host_id) => {
let _ = spawner
.spawn(move |me, ctx| {
me.mark_session_connected(session_id, host_id, None, ctx);
me.mark_session_connected(
session_id,
host_id,
Some(transport),
ctx,
);
})
.await;
}
@@ -610,7 +631,7 @@ impl RemoteServerManager {
/// spontaneous drops -- only for explicit teardown.
pub fn deregister_session(&mut self, session_id: SessionId, ctx: &mut ModelContext<Self>) {
self.last_navigated_path.remove(&session_id);
self.pending_bootstrapped_notifications.remove(&session_id);
self.session_bootstrap_info.remove(&session_id);
self.session_platforms.remove(&session_id);
// Remove the session entry. Dropping the `RemoteSessionState`
@@ -772,19 +793,21 @@ impl RemoteServerManager {
shell_type: &str,
shell_path: Option<&str>,
) {
// Always persist so we can re-send after a reconnect.
self.session_bootstrap_info.insert(
session_id,
SessionBootstrapInfo {
shell_type: shell_type.to_owned(),
shell_path: shell_path.map(ToOwned::to_owned),
},
);
if let Some(client) = self.client_for_session(session_id) {
client.notify_session_bootstrapped(session_id, shell_type, shell_path);
} else {
log::info!(
"notify_session_bootstrapped: session {session_id:?} not yet connected, \
stashing notification"
);
self.pending_bootstrapped_notifications.insert(
session_id,
PendingSessionBootstrappedNotification {
shell_type: shell_type.to_owned(),
shell_path: shell_path.map(ToOwned::to_owned),
},
will send on connect"
);
}
}
@@ -939,18 +962,16 @@ impl RemoteServerManager {
host_id,
});
// Flush any SessionBootstrapped notification that was stashed before
// the session reached Connected.
if let Some(notif) = self.pending_bootstrapped_notifications.remove(&session_id) {
// (Re-)send the SessionBootstrapped notification so the daemon
// registers an executor for this session. This fires on both the
// initial connect and every reconnect.
if let Some(info) = self.session_bootstrap_info.get(&session_id) {
if let Some(client) = self.client_for_session(session_id) {
log::info!(
"Flushing stashed SessionBootstrapped notification for session \
{session_id:?}"
);
log::info!("Sending SessionBootstrapped notification for session {session_id:?}");
client.notify_session_bootstrapped(
session_id,
&notif.shell_type,
notif.shell_path.as_deref(),
&info.shell_type,
info.shell_path.as_deref(),
);
}
}
@@ -1001,7 +1022,6 @@ impl RemoteServerManager {
session_id: SessionId,
ctx: &mut ModelContext<Self>,
) {
self.pending_bootstrapped_notifications.remove(&session_id);
let Some(prev) = self.sessions.remove(&session_id) else {
return;
};
@@ -1044,11 +1064,13 @@ impl RemoteServerManager {
self.attempt_reconnect(
session_id,
1,
host_id,
exit_status,
transport,
control_path,
ReconnectParams {
attempt: 1,
host_id,
exit_status,
transport,
control_path,
},
ctx,
);
} else {
@@ -1078,13 +1100,17 @@ impl RemoteServerManager {
fn attempt_reconnect(
&mut self,
session_id: SessionId,
attempt: u32,
host_id: HostId,
exit_status: Option<RemoteServerExitStatus>,
transport: Arc<dyn RemoteTransport>,
control_path: Option<PathBuf>,
params: ReconnectParams,
ctx: &mut ModelContext<Self>,
) {
let ReconnectParams {
attempt,
host_id,
exit_status,
transport,
control_path,
} = params;
log::info!(
"Attempting reconnect for session {session_id:?} \
(attempt {attempt}/{MAX_RECONNECT_ATTEMPTS})"
@@ -1155,11 +1181,13 @@ impl RemoteServerManager {
.spawn(move |me, ctx| {
me.handle_reconnect_failure(
session_id,
attempt,
host_id,
exit_status,
transport,
control_path,
ReconnectParams {
attempt,
host_id,
exit_status,
transport,
control_path,
},
ctx,
);
})
@@ -1175,34 +1203,34 @@ impl RemoteServerManager {
fn handle_reconnect_failure(
&mut self,
session_id: SessionId,
attempt: u32,
host_id: HostId,
exit_status: Option<RemoteServerExitStatus>,
transport: Arc<dyn RemoteTransport>,
control_path: Option<PathBuf>,
params: ReconnectParams,
ctx: &mut ModelContext<Self>,
) {
if attempt < MAX_RECONNECT_ATTEMPTS {
if params.attempt < MAX_RECONNECT_ATTEMPTS {
self.attempt_reconnect(
session_id,
attempt + 1,
host_id,
exit_status,
transport,
control_path,
ReconnectParams {
attempt: params.attempt + 1,
..params
},
ctx,
);
} else {
log::warn!("Reconnect exhausted for session {session_id:?} after {attempt} attempt(s)");
log::warn!(
"Reconnect exhausted for session {session_id:?} after {} attempt(s)",
params.attempt
);
self.sessions
.insert(session_id, RemoteSessionState::Disconnected);
ctx.emit(RemoteServerManagerEvent::SessionDisconnected {
session_id,
host_id: host_id.clone(),
exit_status,
host_id: params.host_id.clone(),
exit_status: params.exit_status,
});
if !self.host_to_sessions.contains_key(&host_id) {
ctx.emit(RemoteServerManagerEvent::HostDisconnected { host_id });
if !self.host_to_sessions.contains_key(&params.host_id) {
ctx.emit(RemoteServerManagerEvent::HostDisconnected {
host_id: params.host_id,
});
}
}
}

119
specs/APP-4283/TECH.md Normal file
View File

@@ -0,0 +1,119 @@
# APP-4283: Remote Server Reconnection on Spontaneous Disconnect
## Context
When the SSH remote server connection drops spontaneously (daemon crash, proxy killed, transient network failure), the `RemoteServerClient` reader task hits EOF and calls `mark_session_disconnected`. Before this change the session transitions straight to `Disconnected` and stays there — completions, repo metadata, and all other remote-server-backed features stop working until the user manually exits and re-SSHes.
The remote server architecture uses a **proxy → daemon** model:
- `SshTransport` (`app/src/remote_server/ssh_transport.rs`) spawns `ssh … remote-server-proxy` whose stdin/stdout become the protocol channel.
- The proxy (`app/src/remote_server/unix/proxy.rs:34`) connects to a long-lived daemon via a local Unix socket. The daemon survives across proxy lifetimes.
The core session lifecycle lives in `RemoteServerManager` (`crates/remote_server/src/manager.rs`), a singleton model with per-session state tracked via `RemoteSessionState`. The state machine before this change:
```
Connecting → Initializing → Connected → Disconnected
```
Key files:
- `crates/remote_server/src/manager.rs``RemoteServerManager`, session state machine, connect/disconnect lifecycle
- `crates/remote_server/src/transport.rs``RemoteTransport` trait, `Connection` struct
- `app/src/remote_server/ssh_transport.rs` — SSH `RemoteTransport` implementation
- `app/src/terminal/model/session.rs (135-191)``Sessions` model, subscribes to manager events, owns the `Session` and its `CommandExecutor`
- `app/src/terminal/model/session.rs (862-1050)``Session` struct, holds the `command_executor` used by completions
## Proposed changes
### 1. Object-safe `RemoteTransport` for reconnection
The `RemoteTransport` trait (`crates/remote_server/src/transport.rs:63`) now returns boxed futures (`Pin<Box<dyn Future<…> + Send>>`) for object safety. This allows `RemoteServerManager` to store `Arc<dyn RemoteTransport>` on the `Connected` state and carry it forward through reconnection without knowing the concrete transport type.
`SshTransport` (`app/src/remote_server/ssh_transport.rs:36`) implements the trait directly with `Box::pin(async move { … })`.
### 2. Exit status capture
New `RemoteServerExitStatus` type (`manager.rs:67`) records `code: Option<i32>` and `signal_killed: bool`.
`capture_exit_status()` (`manager.rs:982`) reads `child.try_status()` before the `Child` is dropped on disconnect. The result is carried on `SessionDisconnected.exit_status` for diagnostics and telemetry.
### 3. Reconnection state machine
New state variant `Reconnecting { attempt, host_id, control_path }` (`manager.rs:139`). The state machine becomes:
```
Connecting → Initializing → Connected → Reconnecting → Initializing → Connected
↘ Disconnected (if retries exhausted)
```
Constants: `MAX_RECONNECT_ATTEMPTS = 2`, `RECONNECT_DELAY = 2s` (`manager.rs:22-24`).
`mark_session_disconnected()` (`manager.rs:1020`) checks whether the session was `Connected` with a stored transport. If so it clears stale host-index and repo-metadata state, then calls `attempt_reconnect()`.
`attempt_reconnect()` (`manager.rs:1100`) transitions to `Reconnecting`, waits `RECONNECT_DELAY` via `async_io::Timer`, then calls the shared `run_connect_and_handshake()`. On success it calls `mark_session_connected()` and emits `SessionReconnected`. On failure, `handle_reconnect_failure()` (`manager.rs:1203`) either increments the attempt and retries, or gives up and emits `SessionDisconnected`.
The retry parameters are bundled in `ReconnectParams` (`manager.rs:28`) to stay under clippy's argument limit.
### 4. Shared connect + handshake helper
`run_connect_and_handshake()` (`manager.rs:546`) is extracted from `connect_session()` so both the initial connect and reconnect share the same two-phase logic: `transport.connect()``Initializing``client.initialize()``HostId`.
### 5. Session bootstrap info persistence
Previously, `notify_session_bootstrapped()` stashed shell info in a `pending_bootstrapped_notifications` map that was consumed on the first connect and wiped on disconnect. After reconnect, the daemon had no executor registered for the session (`SessionNotFound` error on completions).
Fix: renamed to `session_bootstrap_info` (`manager.rs:310`), which persists for the session lifetime (removed only in `deregister_session`). `notify_session_bootstrapped()` (`manager.rs:790`) always stores the info. `mark_session_connected()` (`manager.rs:965`) re-sends the `SessionBootstrapped` notification on every connect/reconnect.
### 6. Command executor swap on reconnect
`Session.command_executor` (`session.rs:867`) changed from `Arc<dyn CommandExecutor>` to `RwLock<Arc<dyn CommandExecutor>>` for interior mutability through `Arc<Session>`. New `Session::set_command_executor()` (`session.rs:1048`).
All read sites (e.g. `execute_command` at `session.rs:1117`, `load_external_commands` at `session.rs:1083`) clone the `Arc` out of the lock before `.await` to avoid holding the guard across await points.
The `Sessions` model subscribes to `SessionReconnected` (`session.rs:178-189`) and swaps in a new `RemoteServerCommandExecutor` backed by the reconnected client.
### 7. Downstream match arm updates
Added exhaustive match arms for `SessionReconnected` in `Sessions` subscriber (`session.rs:178`) and `terminal/view.rs`.
## Diagram
```mermaid
stateDiagram-v2
[*] --> Connecting: connect_session()
Connecting --> Initializing: transport.connect() ok
Initializing --> Connected: client.initialize() ok
Connected --> Reconnecting: spontaneous EOF + transport available
Reconnecting --> Initializing: transport.connect() ok (after delay)
Reconnecting --> Reconnecting: attempt < MAX (after delay)
Reconnecting --> Disconnected: retries exhausted
Connected --> Disconnected: no transport / explicit deregister
Initializing --> Disconnected: handshake failed
```
## Testing and validation
Manual E2E (verified):
1. Build Warp from this branch (`cargo run`).
2. SSH into a remote host with the remote server feature flag enabled.
3. Kill the `remote-server-proxy` on the remote side: `ssh <host> "pkill -f remote-server-proxy"`.
4. Observe in `Warp.log`:
- `"Remote server process exited for session …"` — exit status captured
- `"Spontaneous disconnect for session …, will attempt reconnect"` — reconnect triggered
- `"Attempting reconnect for session … (attempt 1/2)"` — delay + retry
- `"Remote server connected for session …"` — reconnect succeeded
- `"Sending SessionBootstrapped notification for session …"` — daemon re-registered
- `"Swapped command executor for session … after reconnect"` — executor swapped
5. Press Tab in the SSH session to trigger completions — should work.
6. Verify `navigate_to_directory` re-fires (repo metadata restored).
Edge cases verified manually:
- Kill the daemon (`pkill -f remote-server-daemon`) — harder reconnect since daemon must restart; proxy's `run()` re-spawns it.
- `deregister_session` during reconnect delay — reconnect aborts cleanly (checked via `sessions.contains_key`).
## Follow-ups
- **User-visible reconnecting indicator**: surface the `Reconnecting` state in the terminal UI so the user knows a retry is in progress.
- **Telemetry**: emit a structured event with `exit_status`, `attempt`, and `reconnect_succeeded` for reconnect outcomes.
- **Exponential backoff**: the current fixed 2s delay works for the proxy-restart case; longer backoffs may be warranted for network-level failures.