Add reconnect logic to remote server

This commit is contained in:
Yunfan Yang
2026-04-27 16:56:32 -05:00
parent bc3fffa7e5
commit 823458bb7e
7 changed files with 486 additions and 224 deletions

1
Cargo.lock generated
View File

@@ -10559,6 +10559,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"async-channel",
"async-io",
"async-process",
"command",
"dashmap",

View File

@@ -3,13 +3,18 @@
//! [`SshTransport`] uses an existing SSH ControlMaster socket to check/install
//! the remote server binary and to launch the `remote-server-proxy` process
//! whose stdin/stdout become the protocol channel.
use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
use std::process::Stdio;
use std::sync::Arc;
use anyhow::Result;
use warpui::r#async::executor;
use remote_server::client::RemoteServerClient;
use remote_server::setup::RemotePlatform;
use remote_server::setup::{self, RemotePlatform, CHECK_TIMEOUT, INSTALL_TIMEOUT};
use remote_server::ssh::{run_ssh_command, run_ssh_script, ssh_args};
use remote_server::transport::{Connection, RemoteTransport};
/// SSH transport: connects via a ControlMaster socket.
@@ -18,7 +23,7 @@ use remote_server::transport::{Connection, RemoteTransport};
/// process (`ssh -N -o ControlMaster=yes -o ControlPath=<path>`). All SSH
/// commands (binary check, install, proxy launch) are multiplexed through
/// this socket without re-authenticating.
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct SshTransport {
socket_path: PathBuf,
}
@@ -28,123 +33,112 @@ impl SshTransport {
Self { socket_path }
}
}
impl RemoteTransport for SshTransport {
async fn detect_platform(&self) -> Result<RemotePlatform, String> {
match remote_server::ssh::run_ssh_command(
&self.socket_path,
"uname -sm",
remote_server::setup::CHECK_TIMEOUT,
)
.await
{
Ok(output) if output.status.success() => {
let stdout = String::from_utf8_lossy(&output.stdout);
remote_server::setup::parse_uname_output(&stdout).map_err(|e| format!("{e:#}"))
}
Ok(output) => {
let code = output.status.code().unwrap_or(-1);
let stderr = String::from_utf8_lossy(&output.stderr);
Err(format!("uname -sm exited with code {code}: {stderr}"))
}
Err(e) => Err(format!("{e:#}")),
}
}
async fn check_binary(&self) -> Result<bool, String> {
let bin_path = remote_server::setup::remote_server_binary();
log::info!("Checking for remote server binary at {bin_path}");
match remote_server::ssh::run_ssh_command(
&self.socket_path,
&remote_server::setup::binary_check_command(),
remote_server::setup::CHECK_TIMEOUT,
)
.await
{
// `test -x` exits 0 when present, 1 when missing.
// Any other exit code (or None / signal) is treated as a check failure.
Ok(output) => match output.status.code() {
Some(0) => Ok(true),
Some(1) => Ok(false),
Some(code) => {
let stderr = String::from_utf8_lossy(&output.stderr);
Err(format!("binary check exited with code {code}: {stderr}"))
fn detect_platform(
&self,
) -> Pin<Box<dyn Future<Output = Result<RemotePlatform, String>> + Send>> {
let socket_path = self.socket_path.clone();
Box::pin(async move {
match run_ssh_command(&socket_path, "uname -sm", CHECK_TIMEOUT).await {
Ok(output) if output.status.success() => {
let stdout = String::from_utf8_lossy(&output.stdout);
setup::parse_uname_output(&stdout).map_err(|e| format!("{e:#}"))
}
None => Err("binary check terminated by signal".into()),
},
Err(e) => Err(format!("{e:#}")),
}
}
async fn install_binary(&self) -> Result<(), String> {
let script = remote_server::setup::install_script();
log::info!(
"Installing remote server binary to {}",
remote_server::setup::remote_server_binary()
);
match remote_server::ssh::run_ssh_script(
&self.socket_path,
&script,
remote_server::setup::INSTALL_TIMEOUT,
)
.await
{
Ok(output) if output.status.success() => Ok(()),
Ok(output) => {
let code = output.status.code().unwrap_or(-1);
let stderr = String::from_utf8_lossy(&output.stderr);
Err(format!("install script failed (exit {code}): {stderr}"))
Ok(output) => {
let code = output.status.code().unwrap_or(-1);
let stderr = String::from_utf8_lossy(&output.stderr);
Err(format!("uname -sm exited with code {code}: {stderr}"))
}
Err(e) => Err(format!("{e:#}")),
}
Err(e) => Err(format!("{e:#}")),
}
})
}
async fn connect(&self, executor: &executor::Background) -> Result<Connection> {
let binary = remote_server::setup::remote_server_binary();
let mut args = remote_server::ssh::ssh_args(&self.socket_path);
args.push(format!("{binary} remote-server-proxy"));
fn check_binary(&self) -> Pin<Box<dyn Future<Output = Result<bool, String>> + Send>> {
let socket_path = self.socket_path.clone();
Box::pin(async move {
let bin_path = setup::remote_server_binary();
log::info!("Checking for remote server binary at {bin_path}");
match run_ssh_command(&socket_path, &setup::binary_check_command(), CHECK_TIMEOUT).await
{
Ok(output) => match output.status.code() {
Some(0) => Ok(true),
Some(1) => Ok(false),
Some(code) => {
let stderr = String::from_utf8_lossy(&output.stderr);
Err(format!("binary check exited with code {code}: {stderr}"))
}
None => Err("binary check terminated by signal".into()),
},
Err(e) => Err(format!("{e:#}")),
}
})
}
// `kill_on_drop(true)` pairs with ownership of the `Child` being
// returned in the [`Connection`] below: the
// [`RemoteServerManager`] holds the `Child` on its per-session
// state, and dropping that state (on explicit teardown or
// spontaneous disconnect) sends SIGKILL to this ssh process.
// Without this the ssh child is orphaned and keeps a channel
// open on the ControlMaster socket, blocking the master from
// exiting cleanly when the user logs out.
//
// Note that the child's lifetime is decoupled from any
// `Arc<RemoteServerClient>` clones: other owners (e.g. the
// per-session command executor) can keep the client alive for
// their own purposes without pinning the subprocess.
let mut child = command::r#async::Command::new("ssh")
.args(&args)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.kill_on_drop(true)
.spawn()?;
fn install_binary(&self) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>> {
let socket_path = self.socket_path.clone();
Box::pin(async move {
let script = setup::install_script();
log::info!(
"Installing remote server binary to {}",
setup::remote_server_binary()
);
match run_ssh_script(&socket_path, &script, INSTALL_TIMEOUT).await {
Ok(output) if output.status.success() => Ok(()),
Ok(output) => {
let code = output.status.code().unwrap_or(-1);
let stderr = String::from_utf8_lossy(&output.stderr);
Err(format!("install script failed (exit {code}): {stderr}"))
}
Err(e) => Err(format!("{e:#}")),
}
})
}
let stdin = child
.stdin
.take()
.ok_or_else(|| anyhow::anyhow!("Failed to capture child stdin"))?;
let stdout = child
.stdout
.take()
.ok_or_else(|| anyhow::anyhow!("Failed to capture child stdout"))?;
let stderr = child
.stderr
.take()
.ok_or_else(|| anyhow::anyhow!("Failed to capture child stderr"))?;
fn connect(
&self,
executor: Arc<executor::Background>,
) -> Pin<Box<dyn Future<Output = Result<Connection>> + Send>> {
let socket_path = self.socket_path.clone();
Box::pin(async move {
let binary = setup::remote_server_binary();
let mut args = ssh_args(&socket_path);
args.push(format!("{binary} remote-server-proxy"));
let (client, event_rx) =
RemoteServerClient::from_child_streams(stdin, stdout, stderr, executor);
Ok(Connection {
client,
event_rx,
child,
control_path: Some(self.socket_path.clone()),
// `kill_on_drop(true)` pairs with ownership of the `Child` being
// returned in the [`Connection`] below: the
// [`RemoteServerManager`] holds the `Child` on its per-session
// state, and dropping that state (on explicit teardown or
// spontaneous disconnect) sends SIGKILL to this ssh process.
let mut child = command::r#async::Command::new("ssh")
.args(&args)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true)
.spawn()?;
let stdin = child
.stdin
.take()
.ok_or_else(|| anyhow::anyhow!("Failed to capture child stdin"))?;
let stdout = child
.stdout
.take()
.ok_or_else(|| anyhow::anyhow!("Failed to capture child stdout"))?;
let stderr = child
.stderr
.take()
.ok_or_else(|| anyhow::anyhow!("Failed to capture child stderr"))?;
let (client, event_rx) =
RemoteServerClient::from_child_streams(stdin, stdout, stderr, &executor);
Ok(Connection {
client,
event_rx,
child,
control_path: Some(socket_path),
})
})
}
}

View File

@@ -170,6 +170,7 @@ impl Sessions {
| RemoteServerManagerEvent::SetupStateChanged { .. }
| RemoteServerManagerEvent::BinaryCheckComplete { .. }
| RemoteServerManagerEvent::BinaryInstallComplete { .. }
| RemoteServerManagerEvent::SessionReconnected { .. }
| RemoteServerManagerEvent::ClientRequestFailed { .. }
| RemoteServerManagerEvent::ServerMessageDecodingError { .. } => {}
});

View File

@@ -4376,6 +4376,7 @@ impl TerminalView {
}
}
RemoteServerManagerEvent::SessionConnecting { .. }
| RemoteServerManagerEvent::SessionReconnected { .. }
| RemoteServerManagerEvent::HostConnected { .. }
| RemoteServerManagerEvent::HostDisconnected { .. }
| RemoteServerManagerEvent::RepoMetadataSnapshot { .. }

View File

@@ -24,6 +24,7 @@ warp_util.workspace = true
warpui.workspace = true
[target.'cfg(not(target_family = "wasm"))'.dependencies]
async-io.workspace = true
async-process.workspace = true
[target.'cfg(target_family = "wasm")'.dependencies]

View File

@@ -2,6 +2,7 @@ use std::collections::{HashMap, HashSet};
#[cfg(not(target_family = "wasm"))]
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
#[cfg(not(target_family = "wasm"))]
use crate::client::ClientEvent;
@@ -17,6 +18,11 @@ use serde::Serialize;
use warp_core::SessionId;
use warpui::{Entity, ModelContext, ModelSpawner, SingletonEntity};
/// Maximum number of reconnection attempts after a spontaneous disconnect.
const MAX_RECONNECT_ATTEMPTS: u32 = 2;
/// Delay between reconnection attempts.
const RECONNECT_DELAY: Duration = Duration::from_secs(2);
/// Which phase of the remote server connection flow failed.
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
@@ -45,6 +51,16 @@ pub enum RemoteServerErrorKind {
Other,
}
/// Exit status information captured from the remote server subprocess
/// when the connection drops. Used for diagnostics and telemetry.
#[derive(Clone, Debug, Serialize)]
pub struct RemoteServerExitStatus {
/// Process exit code, if the process exited normally.
pub code: Option<i32>,
/// True if the process was killed by a signal (Unix only).
pub signal_killed: bool,
}
impl RemoteServerErrorKind {
/// Classify a [`ClientError`] into a telemetry error kind.
pub fn from_client_error(error: &crate::client::ClientError) -> Self {
@@ -104,6 +120,16 @@ pub enum RemoteSessionState {
/// See type-level doc.
#[cfg(not(target_family = "wasm"))]
control_path: Option<PathBuf>,
/// Transport stored for reconnection after spontaneous disconnect.
#[cfg(not(target_family = "wasm"))]
transport: Option<Arc<dyn RemoteTransport>>,
},
/// A reconnection attempt is in progress after a spontaneous disconnect.
#[cfg(not(target_family = "wasm"))]
Reconnecting {
attempt: u32,
host_id: HostId,
control_path: Option<PathBuf>,
},
/// Connection dropped (EOF/error from the reader task).
Disconnected,
@@ -145,6 +171,19 @@ pub enum RemoteServerManagerEvent {
SessionDisconnected {
session_id: SessionId,
host_id: HostId,
/// Exit status of the remote server subprocess, if available.
/// `None` when the session was explicitly deregistered or when
/// the exit status could not be determined.
exit_status: Option<RemoteServerExitStatus>,
},
/// A reconnection attempt succeeded. Downstream owners (e.g.
/// `RemoteServerCommandExecutor`) should swap their client reference
/// to the new one carried in `client`.
SessionReconnected {
session_id: SessionId,
host_id: HostId,
attempt: u32,
client: Arc<RemoteServerClient>,
},
/// The manager is no longer tracking this session -- it has been
/// removed from the `sessions` map via `deregister_session`. Fires
@@ -417,10 +456,7 @@ impl RemoteServerManager {
{
log::info!("Starting remote server connection for session {session_id:?}");
// Advance the user-visible setup pipeline. Both callers (binary
// already installed, and binary just installed) enter this
// method right when the Initializing phase begins, so we emit
// the state change from one place.
// Advance the user-visible setup pipeline.
ctx.emit(RemoteServerManagerEvent::SetupStateChanged {
session_id,
state: RemoteServerSetupState::Initializing,
@@ -435,95 +471,23 @@ impl RemoteServerManager {
ctx.background_executor()
.spawn(async move {
// ---- Phase 1: Connect (establish streams, create client) ----
match transport.connect(&executor).await {
Ok(Connection {
client,
event_rx,
child,
control_path,
}) => {
let client = Arc::new(client);
// Transition to Initializing and start draining
// the event channel for push events and disconnect.
// The `Child` is stashed on the session state so
// its lifetime is controlled by the manager -- on
// teardown the state is dropped, which runs the
// `Child`'s destructor and SIGKILLs the subprocess
// via `kill_on_drop`. `control_path` is stashed
// for explicit teardown's `ssh -O exit` call.
let client_for_state = Arc::clone(&client);
match Self::run_connect_and_handshake(
session_id,
&transport,
&spawner,
&executor,
)
.await
{
Ok(host_id) => {
let _ = spawner
.spawn(move |me, ctx| {
me.sessions.insert(
session_id,
RemoteSessionState::Initializing {
client: client_for_state,
_child: child,
control_path,
},
);
// Drain the event channel on the main thread.
// Each push event is forwarded as a manager
// event in real-time. When the stream closes
// (after Disconnected or channel drop), we
// transition the session to Disconnected.
ctx.spawn_stream_local(
event_rx,
move |me, event, ctx| {
me.forward_client_event(session_id, event, ctx);
},
move |me, ctx| {
me.mark_session_disconnected(session_id, ctx);
},
);
me.mark_session_connected(session_id, host_id, None, ctx);
})
.await;
// ---- Phase 2: Initialize handshake ----
match client.initialize().await {
Ok(resp) => {
let host_id = HostId::new(resp.host_id);
let _ = spawner
.spawn(move |me, ctx| {
me.mark_session_connected(session_id, host_id, ctx);
})
.await;
}
Err(e) => {
log::error!(
"Initialize handshake failed for session {session_id:?}: {e}"
);
let error = format!("{e:#}");
let _ = spawner
.spawn(move |me, ctx| {
ctx.emit(
RemoteServerManagerEvent::SetupStateChanged {
session_id,
state: RemoteServerSetupState::Failed {
error: error.clone(),
},
},
);
ctx.emit(
RemoteServerManagerEvent::SessionConnectionFailed {
session_id,
phase: RemoteServerInitPhase::Initialize,
error,
},
);
me.mark_session_disconnected(session_id, ctx);
})
.await;
}
}
}
Err(e) => {
log::error!(
"Failed to connect remote server for session {session_id:?}: {e:#}"
);
log::error!("Connection failed for session {session_id:?}: {e:#}");
let error = format!("{e:#}");
let _ = spawner
.spawn(move |me, ctx| {
@@ -548,6 +512,65 @@ impl RemoteServerManager {
}
}
/// Shared connect + handshake logic used by both `connect_session` and
/// `attempt_reconnect`.
///
/// 1. Calls `transport.connect()` to establish streams.
/// 2. Transitions the session to `Initializing` and starts draining the
/// event channel.
/// 3. Runs the initialize handshake.
///
/// Returns `Ok(host_id)` on success, or an error if either phase fails.
#[cfg(not(target_family = "wasm"))]
async fn run_connect_and_handshake(
session_id: SessionId,
transport: &dyn RemoteTransport,
spawner: &ModelSpawner<Self>,
executor: &Arc<warpui::r#async::executor::Background>,
) -> anyhow::Result<HostId> {
// Phase 1: Connect (establish streams, create client).
let Connection {
client,
event_rx,
child,
control_path,
} = transport.connect(executor.clone()).await?;
let client = Arc::new(client);
let client_for_init = Arc::clone(&client);
// Transition to Initializing and start draining the event channel.
let _ = spawner
.spawn(move |me, ctx| {
me.sessions.insert(
session_id,
RemoteSessionState::Initializing {
client: client_for_init,
_child: child,
control_path,
},
);
ctx.spawn_stream_local(
event_rx,
move |me, event, ctx| {
me.forward_client_event(session_id, event, ctx);
},
move |me, ctx| {
me.mark_session_disconnected(session_id, ctx);
},
);
})
.await;
// Phase 2: Initialize handshake.
let resp = client
.initialize()
.await
.map_err(|e| anyhow::anyhow!("{e:#}"))?;
Ok(HostId::new(resp.host_id))
}
/// Removes a session from the manager and tears down its connection.
///
/// Assumes the caller has already observed that the user's shell
@@ -604,19 +627,26 @@ impl RemoteServerManager {
let control_path = match &prev {
Some(RemoteSessionState::Connected { control_path, .. })
| Some(RemoteSessionState::Initializing { control_path, .. }) => control_path.clone(),
Some(RemoteSessionState::Reconnecting { control_path, .. }) => control_path.clone(),
_ => None,
};
if let Some(RemoteSessionState::Connected { host_id, .. }) = prev {
// Extract `host_id` from states that track a host connection.
let host_id = match &prev {
Some(RemoteSessionState::Connected { host_id, .. }) => Some(host_id.clone()),
#[cfg(not(target_family = "wasm"))]
Some(RemoteSessionState::Reconnecting { host_id, .. }) => Some(host_id.clone()),
_ => None,
};
if let Some(host_id) = host_id {
self.remove_from_host_index(&host_id, session_id);
ctx.emit(RemoteServerManagerEvent::SessionDisconnected {
session_id,
host_id: host_id.clone(),
exit_status: None,
});
if !self.host_to_sessions.contains_key(&host_id) {
ctx.emit(RemoteServerManagerEvent::HostDisconnected {
host_id: host_id.clone(),
});
ctx.emit(RemoteServerManagerEvent::HostDisconnected { host_id });
}
}
ctx.emit(RemoteServerManagerEvent::SessionDeregistered { session_id });
@@ -856,18 +886,21 @@ impl RemoteServerManager {
}
}
/// Transitions a session from `Initializing` to `Connected`. Accepts an
/// optional `transport` for reconnection support. When called from the
/// initial connect path the transport is `None`. When called from the
/// reconnect path the transport is carried forward.
#[cfg(not(target_family = "wasm"))]
fn mark_session_connected(
&mut self,
session_id: SessionId,
host_id: HostId,
transport: Option<Arc<dyn RemoteTransport>>,
ctx: &mut ModelContext<Self>,
) {
log::info!("Remote server connected for session {session_id:?}, host {host_id}");
// Only transition if the session is still in Initializing state.
// Remove first so we can move the client handle (and owned `Child`)
// out.
let Some(RemoteSessionState::Initializing {
client,
_child,
@@ -881,10 +914,11 @@ impl RemoteServerManager {
self.sessions.insert(
session_id,
RemoteSessionState::Connected {
client,
client: client.clone(),
host_id: host_id.clone(),
_child,
control_path,
transport,
},
);
self.host_to_sessions
@@ -922,6 +956,45 @@ impl RemoteServerManager {
}
}
/// Captures the exit status from a `Child` process, if available.
#[cfg(not(target_family = "wasm"))]
fn capture_exit_status(
child: &mut async_process::Child,
session_id: SessionId,
) -> Option<RemoteServerExitStatus> {
match child.try_status() {
Ok(Some(status)) => {
let code = status.code();
#[cfg(unix)]
let signal_killed = {
use std::os::unix::process::ExitStatusExt;
status.signal().is_some()
};
#[cfg(not(unix))]
let signal_killed = false;
log::warn!(
"Remote server process exited for session {session_id:?}: \
code={code:?}, signal_killed={signal_killed}"
);
Some(RemoteServerExitStatus {
code,
signal_killed,
})
}
Ok(None) => {
log::warn!(
"Remote server process still running for session {session_id:?} \
despite EOF on reader task"
);
None
}
Err(e) => {
log::warn!("Failed to read exit status for session {session_id:?}: {e}");
None
}
}
}
#[cfg(not(target_family = "wasm"))]
pub(crate) fn mark_session_disconnected(
&mut self,
@@ -932,20 +1005,201 @@ impl RemoteServerManager {
let Some(prev) = self.sessions.remove(&session_id) else {
return;
};
self.sessions
.insert(session_id, RemoteSessionState::Disconnected);
if let RemoteSessionState::Connected { host_id, .. } = prev {
self.remove_from_host_index(&host_id, session_id);
// Emit `SessionDisconnected` before `HostDisconnected` so that
// subscribers (e.g. the command executor) drop their
// `Arc<RemoteServerClient>` reference before any host-scoped
// teardown runs. This matches the ordering in
// `deregister_session` so both teardown paths look identical
// to subscribers.
// Only attempt reconnect for sessions that were in Connected state
// with a transport available, and not being explicitly deregistered.
if let RemoteSessionState::Connected {
host_id,
mut _child,
control_path,
transport,
..
} = prev
{
let exit_status = Self::capture_exit_status(&mut _child, session_id);
// Drop the old child process explicitly before reconnecting.
drop(_child);
let can_reconnect = transport.is_some();
if can_reconnect {
let transport = transport.unwrap();
log::info!(
"Spontaneous disconnect for session {session_id:?}, \
will attempt reconnect (transport={transport:?})"
);
// Clear stale repo metadata and host index so downstream
// models don't hold onto data from the dead server process.
self.remove_from_host_index(&host_id, session_id);
if !self.host_to_sessions.contains_key(&host_id) {
ctx.emit(RemoteServerManagerEvent::HostDisconnected {
host_id: host_id.clone(),
});
}
// Clear last navigated path so navigate_to_directory
// re-fires after reconnect.
self.last_navigated_path.remove(&session_id);
self.attempt_reconnect(
session_id,
1,
host_id,
exit_status,
transport,
control_path,
ctx,
);
} else {
// No reconnect — fall through to terminal Disconnected.
self.sessions
.insert(session_id, RemoteSessionState::Disconnected);
self.remove_from_host_index(&host_id, session_id);
ctx.emit(RemoteServerManagerEvent::SessionDisconnected {
session_id,
host_id: host_id.clone(),
exit_status,
});
if !self.host_to_sessions.contains_key(&host_id) {
ctx.emit(RemoteServerManagerEvent::HostDisconnected { host_id });
}
}
} else {
// Non-Connected states (Initializing, Connecting, etc.) —
// no reconnect, just mark disconnected.
self.sessions
.insert(session_id, RemoteSessionState::Disconnected);
}
}
/// Attempt to re-establish the remote server connection.
#[cfg(not(target_family = "wasm"))]
fn attempt_reconnect(
&mut self,
session_id: SessionId,
attempt: u32,
host_id: HostId,
exit_status: Option<RemoteServerExitStatus>,
transport: Arc<dyn RemoteTransport>,
control_path: Option<PathBuf>,
ctx: &mut ModelContext<Self>,
) {
log::info!(
"Attempting reconnect for session {session_id:?} \
(attempt {attempt}/{MAX_RECONNECT_ATTEMPTS})"
);
self.sessions.insert(
session_id,
RemoteSessionState::Reconnecting {
attempt,
host_id: host_id.clone(),
control_path: control_path.clone(),
},
);
let spawner = self.spawner.clone();
let executor = ctx.background_executor().clone();
let transport_clone = Arc::clone(&transport);
ctx.background_executor()
.spawn(async move {
async_io::Timer::after(RECONNECT_DELAY).await;
// Check if the session was deregistered during the delay.
// (Checked via spawner since sessions lives on the main thread.)
let was_removed = spawner
.spawn(move |me, _ctx| !me.sessions.contains_key(&session_id))
.await
.unwrap_or(true);
if was_removed {
log::info!("Session {session_id:?} removed during reconnect delay, aborting");
return;
}
match Self::run_connect_and_handshake(
session_id,
&*transport_clone,
&spawner,
&executor,
)
.await
{
Ok(new_host_id) => {
let _ = spawner
.spawn(move |me, ctx| {
me.mark_session_connected(
session_id,
new_host_id.clone(),
Some(transport),
ctx,
);
if let Some(client) = me.client_for_session(session_id).cloned() {
ctx.emit(RemoteServerManagerEvent::SessionReconnected {
session_id,
host_id: new_host_id,
attempt,
client,
});
}
})
.await;
}
Err(e) => {
log::error!(
"Reconnect failed for session {session_id:?} \
(attempt {attempt}): {e:#}"
);
let _ = spawner
.spawn(move |me, ctx| {
me.handle_reconnect_failure(
session_id,
attempt,
host_id,
exit_status,
transport,
control_path,
ctx,
);
})
.await;
}
}
})
.detach();
}
/// Handle a failed reconnection attempt: either retry or give up.
#[cfg(not(target_family = "wasm"))]
fn handle_reconnect_failure(
&mut self,
session_id: SessionId,
attempt: u32,
host_id: HostId,
exit_status: Option<RemoteServerExitStatus>,
transport: Arc<dyn RemoteTransport>,
control_path: Option<PathBuf>,
ctx: &mut ModelContext<Self>,
) {
if attempt < MAX_RECONNECT_ATTEMPTS {
self.attempt_reconnect(
session_id,
attempt + 1,
host_id,
exit_status,
transport,
control_path,
ctx,
);
} else {
log::warn!("Reconnect exhausted for session {session_id:?} after {attempt} attempt(s)");
self.sessions
.insert(session_id, RemoteSessionState::Disconnected);
ctx.emit(RemoteServerManagerEvent::SessionDisconnected {
session_id,
host_id: host_id.clone(),
exit_status,
});
if !self.host_to_sessions.contains_key(&host_id) {
ctx.emit(RemoteServerManagerEvent::HostDisconnected { host_id });

View File

@@ -6,13 +6,13 @@
//! in-process for tests) implement the same trait without touching the
//! manager.
//!
//! Methods are async. Callers use the trait via generics
//! (`T: RemoteTransport`) rather than `dyn` dispatch.
//! Returns boxed futures for object safety — the manager stores
//! `Arc<dyn RemoteTransport>` for reconnection.
//!
//! [`RemoteServerManager`]: crate::manager::RemoteServerManager
use std::future::Future;
#[cfg(not(target_family = "wasm"))]
use std::path::PathBuf;
use std::pin::Pin;
use async_channel::Receiver;
use warpui::r#async::executor;
@@ -56,12 +56,18 @@ pub struct Connection {
pub control_path: Option<PathBuf>,
}
pub trait RemoteTransport: Send + Sync {
/// Transport abstraction for remote server connections.
///
/// Object-safe: returns boxed futures so implementations can be stored
/// as `Arc<dyn RemoteTransport>` for reconnection.
pub trait RemoteTransport: Send + Sync + std::fmt::Debug {
/// Detects the remote host's OS and architecture by running `uname -sm`.
///
/// Returns the parsed [`RemotePlatform`] on success, or an error string
/// if the command fails or the output cannot be parsed.
fn detect_platform(&self) -> impl Future<Output = Result<RemotePlatform, String>> + Send;
fn detect_platform(
&self,
) -> Pin<Box<dyn std::future::Future<Output = Result<RemotePlatform, String>> + Send>>;
/// Checks whether the remote server binary is present on the remote host.
///
@@ -72,7 +78,9 @@ pub trait RemoteTransport: Send + Sync {
/// Returns `Ok(true)` if the binary is installed and executable,
/// `Ok(false)` if it is definitively not installed, and
/// `Err(_)` if the check failed (e.g. SSH timeout/unreachable).
fn check_binary(&self) -> impl Future<Output = Result<bool, String>> + Send;
fn check_binary(
&self,
) -> Pin<Box<dyn std::future::Future<Output = Result<bool, String>> + Send>>;
/// Installs the remote server binary on the remote host.
///
@@ -82,7 +90,9 @@ pub trait RemoteTransport: Send + Sync {
///
/// Returns `Ok(())` if the install succeeded, and
/// `Err(_)` if the install failed (e.g. SSH timeout, script error).
fn install_binary(&self) -> impl Future<Output = Result<(), String>> + Send;
fn install_binary(
&self,
) -> Pin<Box<dyn std::future::Future<Output = Result<(), String>> + Send>>;
/// Establish a new connection to the remote server.
///
@@ -96,6 +106,6 @@ pub trait RemoteTransport: Send + Sync {
/// a socket). Stderr forwarding to local logging should also happen here.
fn connect(
&self,
executor: &executor::Background,
) -> impl Future<Output = anyhow::Result<Connection>> + Send;
executor: std::sync::Arc<executor::Background>,
) -> Pin<Box<dyn std::future::Future<Output = anyhow::Result<Connection>> + Send>>;
}