mirror of
https://github.com/warpdotdev/warp.git
synced 2026-06-13 10:25:06 +08:00
Add reconnect logic to remote server
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -10559,6 +10559,7 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-channel",
|
||||
"async-io",
|
||||
"async-process",
|
||||
"command",
|
||||
"dashmap",
|
||||
|
||||
@@ -3,13 +3,18 @@
|
||||
//! [`SshTransport`] uses an existing SSH ControlMaster socket to check/install
|
||||
//! the remote server binary and to launch the `remote-server-proxy` process
|
||||
//! whose stdin/stdout become the protocol channel.
|
||||
use std::future::Future;
|
||||
use std::path::PathBuf;
|
||||
use std::pin::Pin;
|
||||
use std::process::Stdio;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
use warpui::r#async::executor;
|
||||
|
||||
use remote_server::client::RemoteServerClient;
|
||||
use remote_server::setup::RemotePlatform;
|
||||
use remote_server::setup::{self, RemotePlatform, CHECK_TIMEOUT, INSTALL_TIMEOUT};
|
||||
use remote_server::ssh::{run_ssh_command, run_ssh_script, ssh_args};
|
||||
use remote_server::transport::{Connection, RemoteTransport};
|
||||
|
||||
/// SSH transport: connects via a ControlMaster socket.
|
||||
@@ -18,7 +23,7 @@ use remote_server::transport::{Connection, RemoteTransport};
|
||||
/// process (`ssh -N -o ControlMaster=yes -o ControlPath=<path>`). All SSH
|
||||
/// commands (binary check, install, proxy launch) are multiplexed through
|
||||
/// this socket without re-authenticating.
|
||||
#[derive(Clone)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct SshTransport {
|
||||
socket_path: PathBuf,
|
||||
}
|
||||
@@ -28,123 +33,112 @@ impl SshTransport {
|
||||
Self { socket_path }
|
||||
}
|
||||
}
|
||||
|
||||
impl RemoteTransport for SshTransport {
|
||||
async fn detect_platform(&self) -> Result<RemotePlatform, String> {
|
||||
match remote_server::ssh::run_ssh_command(
|
||||
&self.socket_path,
|
||||
"uname -sm",
|
||||
remote_server::setup::CHECK_TIMEOUT,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(output) if output.status.success() => {
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
remote_server::setup::parse_uname_output(&stdout).map_err(|e| format!("{e:#}"))
|
||||
}
|
||||
Ok(output) => {
|
||||
let code = output.status.code().unwrap_or(-1);
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
Err(format!("uname -sm exited with code {code}: {stderr}"))
|
||||
}
|
||||
Err(e) => Err(format!("{e:#}")),
|
||||
}
|
||||
}
|
||||
|
||||
async fn check_binary(&self) -> Result<bool, String> {
|
||||
let bin_path = remote_server::setup::remote_server_binary();
|
||||
log::info!("Checking for remote server binary at {bin_path}");
|
||||
match remote_server::ssh::run_ssh_command(
|
||||
&self.socket_path,
|
||||
&remote_server::setup::binary_check_command(),
|
||||
remote_server::setup::CHECK_TIMEOUT,
|
||||
)
|
||||
.await
|
||||
{
|
||||
// `test -x` exits 0 when present, 1 when missing.
|
||||
// Any other exit code (or None / signal) is treated as a check failure.
|
||||
Ok(output) => match output.status.code() {
|
||||
Some(0) => Ok(true),
|
||||
Some(1) => Ok(false),
|
||||
Some(code) => {
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
Err(format!("binary check exited with code {code}: {stderr}"))
|
||||
fn detect_platform(
|
||||
&self,
|
||||
) -> Pin<Box<dyn Future<Output = Result<RemotePlatform, String>> + Send>> {
|
||||
let socket_path = self.socket_path.clone();
|
||||
Box::pin(async move {
|
||||
match run_ssh_command(&socket_path, "uname -sm", CHECK_TIMEOUT).await {
|
||||
Ok(output) if output.status.success() => {
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
setup::parse_uname_output(&stdout).map_err(|e| format!("{e:#}"))
|
||||
}
|
||||
None => Err("binary check terminated by signal".into()),
|
||||
},
|
||||
Err(e) => Err(format!("{e:#}")),
|
||||
}
|
||||
}
|
||||
|
||||
async fn install_binary(&self) -> Result<(), String> {
|
||||
let script = remote_server::setup::install_script();
|
||||
log::info!(
|
||||
"Installing remote server binary to {}",
|
||||
remote_server::setup::remote_server_binary()
|
||||
);
|
||||
match remote_server::ssh::run_ssh_script(
|
||||
&self.socket_path,
|
||||
&script,
|
||||
remote_server::setup::INSTALL_TIMEOUT,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(output) if output.status.success() => Ok(()),
|
||||
Ok(output) => {
|
||||
let code = output.status.code().unwrap_or(-1);
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
Err(format!("install script failed (exit {code}): {stderr}"))
|
||||
Ok(output) => {
|
||||
let code = output.status.code().unwrap_or(-1);
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
Err(format!("uname -sm exited with code {code}: {stderr}"))
|
||||
}
|
||||
Err(e) => Err(format!("{e:#}")),
|
||||
}
|
||||
Err(e) => Err(format!("{e:#}")),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn connect(&self, executor: &executor::Background) -> Result<Connection> {
|
||||
let binary = remote_server::setup::remote_server_binary();
|
||||
let mut args = remote_server::ssh::ssh_args(&self.socket_path);
|
||||
args.push(format!("{binary} remote-server-proxy"));
|
||||
fn check_binary(&self) -> Pin<Box<dyn Future<Output = Result<bool, String>> + Send>> {
|
||||
let socket_path = self.socket_path.clone();
|
||||
Box::pin(async move {
|
||||
let bin_path = setup::remote_server_binary();
|
||||
log::info!("Checking for remote server binary at {bin_path}");
|
||||
match run_ssh_command(&socket_path, &setup::binary_check_command(), CHECK_TIMEOUT).await
|
||||
{
|
||||
Ok(output) => match output.status.code() {
|
||||
Some(0) => Ok(true),
|
||||
Some(1) => Ok(false),
|
||||
Some(code) => {
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
Err(format!("binary check exited with code {code}: {stderr}"))
|
||||
}
|
||||
None => Err("binary check terminated by signal".into()),
|
||||
},
|
||||
Err(e) => Err(format!("{e:#}")),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// `kill_on_drop(true)` pairs with ownership of the `Child` being
|
||||
// returned in the [`Connection`] below: the
|
||||
// [`RemoteServerManager`] holds the `Child` on its per-session
|
||||
// state, and dropping that state (on explicit teardown or
|
||||
// spontaneous disconnect) sends SIGKILL to this ssh process.
|
||||
// Without this the ssh child is orphaned and keeps a channel
|
||||
// open on the ControlMaster socket, blocking the master from
|
||||
// exiting cleanly when the user logs out.
|
||||
//
|
||||
// Note that the child's lifetime is decoupled from any
|
||||
// `Arc<RemoteServerClient>` clones: other owners (e.g. the
|
||||
// per-session command executor) can keep the client alive for
|
||||
// their own purposes without pinning the subprocess.
|
||||
let mut child = command::r#async::Command::new("ssh")
|
||||
.args(&args)
|
||||
.stdin(std::process::Stdio::piped())
|
||||
.stdout(std::process::Stdio::piped())
|
||||
.stderr(std::process::Stdio::piped())
|
||||
.kill_on_drop(true)
|
||||
.spawn()?;
|
||||
fn install_binary(&self) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>> {
|
||||
let socket_path = self.socket_path.clone();
|
||||
Box::pin(async move {
|
||||
let script = setup::install_script();
|
||||
log::info!(
|
||||
"Installing remote server binary to {}",
|
||||
setup::remote_server_binary()
|
||||
);
|
||||
match run_ssh_script(&socket_path, &script, INSTALL_TIMEOUT).await {
|
||||
Ok(output) if output.status.success() => Ok(()),
|
||||
Ok(output) => {
|
||||
let code = output.status.code().unwrap_or(-1);
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
Err(format!("install script failed (exit {code}): {stderr}"))
|
||||
}
|
||||
Err(e) => Err(format!("{e:#}")),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
let stdin = child
|
||||
.stdin
|
||||
.take()
|
||||
.ok_or_else(|| anyhow::anyhow!("Failed to capture child stdin"))?;
|
||||
let stdout = child
|
||||
.stdout
|
||||
.take()
|
||||
.ok_or_else(|| anyhow::anyhow!("Failed to capture child stdout"))?;
|
||||
let stderr = child
|
||||
.stderr
|
||||
.take()
|
||||
.ok_or_else(|| anyhow::anyhow!("Failed to capture child stderr"))?;
|
||||
fn connect(
|
||||
&self,
|
||||
executor: Arc<executor::Background>,
|
||||
) -> Pin<Box<dyn Future<Output = Result<Connection>> + Send>> {
|
||||
let socket_path = self.socket_path.clone();
|
||||
Box::pin(async move {
|
||||
let binary = setup::remote_server_binary();
|
||||
let mut args = ssh_args(&socket_path);
|
||||
args.push(format!("{binary} remote-server-proxy"));
|
||||
|
||||
let (client, event_rx) =
|
||||
RemoteServerClient::from_child_streams(stdin, stdout, stderr, executor);
|
||||
Ok(Connection {
|
||||
client,
|
||||
event_rx,
|
||||
child,
|
||||
control_path: Some(self.socket_path.clone()),
|
||||
// `kill_on_drop(true)` pairs with ownership of the `Child` being
|
||||
// returned in the [`Connection`] below: the
|
||||
// [`RemoteServerManager`] holds the `Child` on its per-session
|
||||
// state, and dropping that state (on explicit teardown or
|
||||
// spontaneous disconnect) sends SIGKILL to this ssh process.
|
||||
let mut child = command::r#async::Command::new("ssh")
|
||||
.args(&args)
|
||||
.stdin(Stdio::piped())
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.kill_on_drop(true)
|
||||
.spawn()?;
|
||||
|
||||
let stdin = child
|
||||
.stdin
|
||||
.take()
|
||||
.ok_or_else(|| anyhow::anyhow!("Failed to capture child stdin"))?;
|
||||
let stdout = child
|
||||
.stdout
|
||||
.take()
|
||||
.ok_or_else(|| anyhow::anyhow!("Failed to capture child stdout"))?;
|
||||
let stderr = child
|
||||
.stderr
|
||||
.take()
|
||||
.ok_or_else(|| anyhow::anyhow!("Failed to capture child stderr"))?;
|
||||
|
||||
let (client, event_rx) =
|
||||
RemoteServerClient::from_child_streams(stdin, stdout, stderr, &executor);
|
||||
Ok(Connection {
|
||||
client,
|
||||
event_rx,
|
||||
child,
|
||||
control_path: Some(socket_path),
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -170,6 +170,7 @@ impl Sessions {
|
||||
| RemoteServerManagerEvent::SetupStateChanged { .. }
|
||||
| RemoteServerManagerEvent::BinaryCheckComplete { .. }
|
||||
| RemoteServerManagerEvent::BinaryInstallComplete { .. }
|
||||
| RemoteServerManagerEvent::SessionReconnected { .. }
|
||||
| RemoteServerManagerEvent::ClientRequestFailed { .. }
|
||||
| RemoteServerManagerEvent::ServerMessageDecodingError { .. } => {}
|
||||
});
|
||||
|
||||
@@ -4376,6 +4376,7 @@ impl TerminalView {
|
||||
}
|
||||
}
|
||||
RemoteServerManagerEvent::SessionConnecting { .. }
|
||||
| RemoteServerManagerEvent::SessionReconnected { .. }
|
||||
| RemoteServerManagerEvent::HostConnected { .. }
|
||||
| RemoteServerManagerEvent::HostDisconnected { .. }
|
||||
| RemoteServerManagerEvent::RepoMetadataSnapshot { .. }
|
||||
|
||||
@@ -24,6 +24,7 @@ warp_util.workspace = true
|
||||
warpui.workspace = true
|
||||
|
||||
[target.'cfg(not(target_family = "wasm"))'.dependencies]
|
||||
async-io.workspace = true
|
||||
async-process.workspace = true
|
||||
|
||||
[target.'cfg(target_family = "wasm")'.dependencies]
|
||||
|
||||
@@ -2,6 +2,7 @@ use std::collections::{HashMap, HashSet};
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
use crate::client::ClientEvent;
|
||||
@@ -17,6 +18,11 @@ use serde::Serialize;
|
||||
use warp_core::SessionId;
|
||||
use warpui::{Entity, ModelContext, ModelSpawner, SingletonEntity};
|
||||
|
||||
/// Maximum number of reconnection attempts after a spontaneous disconnect.
|
||||
const MAX_RECONNECT_ATTEMPTS: u32 = 2;
|
||||
/// Delay between reconnection attempts.
|
||||
const RECONNECT_DELAY: Duration = Duration::from_secs(2);
|
||||
|
||||
/// Which phase of the remote server connection flow failed.
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
@@ -45,6 +51,16 @@ pub enum RemoteServerErrorKind {
|
||||
Other,
|
||||
}
|
||||
|
||||
/// Exit status information captured from the remote server subprocess
|
||||
/// when the connection drops. Used for diagnostics and telemetry.
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
pub struct RemoteServerExitStatus {
|
||||
/// Process exit code, if the process exited normally.
|
||||
pub code: Option<i32>,
|
||||
/// True if the process was killed by a signal (Unix only).
|
||||
pub signal_killed: bool,
|
||||
}
|
||||
|
||||
impl RemoteServerErrorKind {
|
||||
/// Classify a [`ClientError`] into a telemetry error kind.
|
||||
pub fn from_client_error(error: &crate::client::ClientError) -> Self {
|
||||
@@ -104,6 +120,16 @@ pub enum RemoteSessionState {
|
||||
/// See type-level doc.
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
control_path: Option<PathBuf>,
|
||||
/// Transport stored for reconnection after spontaneous disconnect.
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
transport: Option<Arc<dyn RemoteTransport>>,
|
||||
},
|
||||
/// A reconnection attempt is in progress after a spontaneous disconnect.
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
Reconnecting {
|
||||
attempt: u32,
|
||||
host_id: HostId,
|
||||
control_path: Option<PathBuf>,
|
||||
},
|
||||
/// Connection dropped (EOF/error from the reader task).
|
||||
Disconnected,
|
||||
@@ -145,6 +171,19 @@ pub enum RemoteServerManagerEvent {
|
||||
SessionDisconnected {
|
||||
session_id: SessionId,
|
||||
host_id: HostId,
|
||||
/// Exit status of the remote server subprocess, if available.
|
||||
/// `None` when the session was explicitly deregistered or when
|
||||
/// the exit status could not be determined.
|
||||
exit_status: Option<RemoteServerExitStatus>,
|
||||
},
|
||||
/// A reconnection attempt succeeded. Downstream owners (e.g.
|
||||
/// `RemoteServerCommandExecutor`) should swap their client reference
|
||||
/// to the new one carried in `client`.
|
||||
SessionReconnected {
|
||||
session_id: SessionId,
|
||||
host_id: HostId,
|
||||
attempt: u32,
|
||||
client: Arc<RemoteServerClient>,
|
||||
},
|
||||
/// The manager is no longer tracking this session -- it has been
|
||||
/// removed from the `sessions` map via `deregister_session`. Fires
|
||||
@@ -417,10 +456,7 @@ impl RemoteServerManager {
|
||||
{
|
||||
log::info!("Starting remote server connection for session {session_id:?}");
|
||||
|
||||
// Advance the user-visible setup pipeline. Both callers (binary
|
||||
// already installed, and binary just installed) enter this
|
||||
// method right when the Initializing phase begins, so we emit
|
||||
// the state change from one place.
|
||||
// Advance the user-visible setup pipeline.
|
||||
ctx.emit(RemoteServerManagerEvent::SetupStateChanged {
|
||||
session_id,
|
||||
state: RemoteServerSetupState::Initializing,
|
||||
@@ -435,95 +471,23 @@ impl RemoteServerManager {
|
||||
|
||||
ctx.background_executor()
|
||||
.spawn(async move {
|
||||
// ---- Phase 1: Connect (establish streams, create client) ----
|
||||
match transport.connect(&executor).await {
|
||||
Ok(Connection {
|
||||
client,
|
||||
event_rx,
|
||||
child,
|
||||
control_path,
|
||||
}) => {
|
||||
let client = Arc::new(client);
|
||||
|
||||
// Transition to Initializing and start draining
|
||||
// the event channel for push events and disconnect.
|
||||
// The `Child` is stashed on the session state so
|
||||
// its lifetime is controlled by the manager -- on
|
||||
// teardown the state is dropped, which runs the
|
||||
// `Child`'s destructor and SIGKILLs the subprocess
|
||||
// via `kill_on_drop`. `control_path` is stashed
|
||||
// for explicit teardown's `ssh -O exit` call.
|
||||
let client_for_state = Arc::clone(&client);
|
||||
match Self::run_connect_and_handshake(
|
||||
session_id,
|
||||
&transport,
|
||||
&spawner,
|
||||
&executor,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(host_id) => {
|
||||
let _ = spawner
|
||||
.spawn(move |me, ctx| {
|
||||
me.sessions.insert(
|
||||
session_id,
|
||||
RemoteSessionState::Initializing {
|
||||
client: client_for_state,
|
||||
_child: child,
|
||||
control_path,
|
||||
},
|
||||
);
|
||||
|
||||
// Drain the event channel on the main thread.
|
||||
// Each push event is forwarded as a manager
|
||||
// event in real-time. When the stream closes
|
||||
// (after Disconnected or channel drop), we
|
||||
// transition the session to Disconnected.
|
||||
ctx.spawn_stream_local(
|
||||
event_rx,
|
||||
move |me, event, ctx| {
|
||||
me.forward_client_event(session_id, event, ctx);
|
||||
},
|
||||
move |me, ctx| {
|
||||
me.mark_session_disconnected(session_id, ctx);
|
||||
},
|
||||
);
|
||||
me.mark_session_connected(session_id, host_id, None, ctx);
|
||||
})
|
||||
.await;
|
||||
|
||||
// ---- Phase 2: Initialize handshake ----
|
||||
match client.initialize().await {
|
||||
Ok(resp) => {
|
||||
let host_id = HostId::new(resp.host_id);
|
||||
let _ = spawner
|
||||
.spawn(move |me, ctx| {
|
||||
me.mark_session_connected(session_id, host_id, ctx);
|
||||
})
|
||||
.await;
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!(
|
||||
"Initialize handshake failed for session {session_id:?}: {e}"
|
||||
);
|
||||
let error = format!("{e:#}");
|
||||
let _ = spawner
|
||||
.spawn(move |me, ctx| {
|
||||
ctx.emit(
|
||||
RemoteServerManagerEvent::SetupStateChanged {
|
||||
session_id,
|
||||
state: RemoteServerSetupState::Failed {
|
||||
error: error.clone(),
|
||||
},
|
||||
},
|
||||
);
|
||||
ctx.emit(
|
||||
RemoteServerManagerEvent::SessionConnectionFailed {
|
||||
session_id,
|
||||
phase: RemoteServerInitPhase::Initialize,
|
||||
error,
|
||||
},
|
||||
);
|
||||
me.mark_session_disconnected(session_id, ctx);
|
||||
})
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!(
|
||||
"Failed to connect remote server for session {session_id:?}: {e:#}"
|
||||
);
|
||||
log::error!("Connection failed for session {session_id:?}: {e:#}");
|
||||
let error = format!("{e:#}");
|
||||
let _ = spawner
|
||||
.spawn(move |me, ctx| {
|
||||
@@ -548,6 +512,65 @@ impl RemoteServerManager {
|
||||
}
|
||||
}
|
||||
|
||||
/// Shared connect + handshake logic used by both `connect_session` and
|
||||
/// `attempt_reconnect`.
|
||||
///
|
||||
/// 1. Calls `transport.connect()` to establish streams.
|
||||
/// 2. Transitions the session to `Initializing` and starts draining the
|
||||
/// event channel.
|
||||
/// 3. Runs the initialize handshake.
|
||||
///
|
||||
/// Returns `Ok(host_id)` on success, or an error if either phase fails.
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
async fn run_connect_and_handshake(
|
||||
session_id: SessionId,
|
||||
transport: &dyn RemoteTransport,
|
||||
spawner: &ModelSpawner<Self>,
|
||||
executor: &Arc<warpui::r#async::executor::Background>,
|
||||
) -> anyhow::Result<HostId> {
|
||||
// Phase 1: Connect (establish streams, create client).
|
||||
let Connection {
|
||||
client,
|
||||
event_rx,
|
||||
child,
|
||||
control_path,
|
||||
} = transport.connect(executor.clone()).await?;
|
||||
|
||||
let client = Arc::new(client);
|
||||
let client_for_init = Arc::clone(&client);
|
||||
|
||||
// Transition to Initializing and start draining the event channel.
|
||||
let _ = spawner
|
||||
.spawn(move |me, ctx| {
|
||||
me.sessions.insert(
|
||||
session_id,
|
||||
RemoteSessionState::Initializing {
|
||||
client: client_for_init,
|
||||
_child: child,
|
||||
control_path,
|
||||
},
|
||||
);
|
||||
|
||||
ctx.spawn_stream_local(
|
||||
event_rx,
|
||||
move |me, event, ctx| {
|
||||
me.forward_client_event(session_id, event, ctx);
|
||||
},
|
||||
move |me, ctx| {
|
||||
me.mark_session_disconnected(session_id, ctx);
|
||||
},
|
||||
);
|
||||
})
|
||||
.await;
|
||||
|
||||
// Phase 2: Initialize handshake.
|
||||
let resp = client
|
||||
.initialize()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{e:#}"))?;
|
||||
Ok(HostId::new(resp.host_id))
|
||||
}
|
||||
|
||||
/// Removes a session from the manager and tears down its connection.
|
||||
///
|
||||
/// Assumes the caller has already observed that the user's shell
|
||||
@@ -604,19 +627,26 @@ impl RemoteServerManager {
|
||||
let control_path = match &prev {
|
||||
Some(RemoteSessionState::Connected { control_path, .. })
|
||||
| Some(RemoteSessionState::Initializing { control_path, .. }) => control_path.clone(),
|
||||
Some(RemoteSessionState::Reconnecting { control_path, .. }) => control_path.clone(),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
if let Some(RemoteSessionState::Connected { host_id, .. }) = prev {
|
||||
// Extract `host_id` from states that track a host connection.
|
||||
let host_id = match &prev {
|
||||
Some(RemoteSessionState::Connected { host_id, .. }) => Some(host_id.clone()),
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
Some(RemoteSessionState::Reconnecting { host_id, .. }) => Some(host_id.clone()),
|
||||
_ => None,
|
||||
};
|
||||
if let Some(host_id) = host_id {
|
||||
self.remove_from_host_index(&host_id, session_id);
|
||||
ctx.emit(RemoteServerManagerEvent::SessionDisconnected {
|
||||
session_id,
|
||||
host_id: host_id.clone(),
|
||||
exit_status: None,
|
||||
});
|
||||
if !self.host_to_sessions.contains_key(&host_id) {
|
||||
ctx.emit(RemoteServerManagerEvent::HostDisconnected {
|
||||
host_id: host_id.clone(),
|
||||
});
|
||||
ctx.emit(RemoteServerManagerEvent::HostDisconnected { host_id });
|
||||
}
|
||||
}
|
||||
ctx.emit(RemoteServerManagerEvent::SessionDeregistered { session_id });
|
||||
@@ -856,18 +886,21 @@ impl RemoteServerManager {
|
||||
}
|
||||
}
|
||||
|
||||
/// Transitions a session from `Initializing` to `Connected`. Accepts an
|
||||
/// optional `transport` for reconnection support. When called from the
|
||||
/// initial connect path the transport is `None`. When called from the
|
||||
/// reconnect path the transport is carried forward.
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
fn mark_session_connected(
|
||||
&mut self,
|
||||
session_id: SessionId,
|
||||
host_id: HostId,
|
||||
transport: Option<Arc<dyn RemoteTransport>>,
|
||||
ctx: &mut ModelContext<Self>,
|
||||
) {
|
||||
log::info!("Remote server connected for session {session_id:?}, host {host_id}");
|
||||
|
||||
// Only transition if the session is still in Initializing state.
|
||||
// Remove first so we can move the client handle (and owned `Child`)
|
||||
// out.
|
||||
let Some(RemoteSessionState::Initializing {
|
||||
client,
|
||||
_child,
|
||||
@@ -881,10 +914,11 @@ impl RemoteServerManager {
|
||||
self.sessions.insert(
|
||||
session_id,
|
||||
RemoteSessionState::Connected {
|
||||
client,
|
||||
client: client.clone(),
|
||||
host_id: host_id.clone(),
|
||||
_child,
|
||||
control_path,
|
||||
transport,
|
||||
},
|
||||
);
|
||||
self.host_to_sessions
|
||||
@@ -922,6 +956,45 @@ impl RemoteServerManager {
|
||||
}
|
||||
}
|
||||
|
||||
/// Captures the exit status from a `Child` process, if available.
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
fn capture_exit_status(
|
||||
child: &mut async_process::Child,
|
||||
session_id: SessionId,
|
||||
) -> Option<RemoteServerExitStatus> {
|
||||
match child.try_status() {
|
||||
Ok(Some(status)) => {
|
||||
let code = status.code();
|
||||
#[cfg(unix)]
|
||||
let signal_killed = {
|
||||
use std::os::unix::process::ExitStatusExt;
|
||||
status.signal().is_some()
|
||||
};
|
||||
#[cfg(not(unix))]
|
||||
let signal_killed = false;
|
||||
log::warn!(
|
||||
"Remote server process exited for session {session_id:?}: \
|
||||
code={code:?}, signal_killed={signal_killed}"
|
||||
);
|
||||
Some(RemoteServerExitStatus {
|
||||
code,
|
||||
signal_killed,
|
||||
})
|
||||
}
|
||||
Ok(None) => {
|
||||
log::warn!(
|
||||
"Remote server process still running for session {session_id:?} \
|
||||
despite EOF on reader task"
|
||||
);
|
||||
None
|
||||
}
|
||||
Err(e) => {
|
||||
log::warn!("Failed to read exit status for session {session_id:?}: {e}");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
pub(crate) fn mark_session_disconnected(
|
||||
&mut self,
|
||||
@@ -932,20 +1005,201 @@ impl RemoteServerManager {
|
||||
let Some(prev) = self.sessions.remove(&session_id) else {
|
||||
return;
|
||||
};
|
||||
self.sessions
|
||||
.insert(session_id, RemoteSessionState::Disconnected);
|
||||
|
||||
if let RemoteSessionState::Connected { host_id, .. } = prev {
|
||||
self.remove_from_host_index(&host_id, session_id);
|
||||
// Emit `SessionDisconnected` before `HostDisconnected` so that
|
||||
// subscribers (e.g. the command executor) drop their
|
||||
// `Arc<RemoteServerClient>` reference before any host-scoped
|
||||
// teardown runs. This matches the ordering in
|
||||
// `deregister_session` so both teardown paths look identical
|
||||
// to subscribers.
|
||||
// Only attempt reconnect for sessions that were in Connected state
|
||||
// with a transport available, and not being explicitly deregistered.
|
||||
if let RemoteSessionState::Connected {
|
||||
host_id,
|
||||
mut _child,
|
||||
control_path,
|
||||
transport,
|
||||
..
|
||||
} = prev
|
||||
{
|
||||
let exit_status = Self::capture_exit_status(&mut _child, session_id);
|
||||
// Drop the old child process explicitly before reconnecting.
|
||||
drop(_child);
|
||||
|
||||
let can_reconnect = transport.is_some();
|
||||
|
||||
if can_reconnect {
|
||||
let transport = transport.unwrap();
|
||||
log::info!(
|
||||
"Spontaneous disconnect for session {session_id:?}, \
|
||||
will attempt reconnect (transport={transport:?})"
|
||||
);
|
||||
|
||||
// Clear stale repo metadata and host index so downstream
|
||||
// models don't hold onto data from the dead server process.
|
||||
self.remove_from_host_index(&host_id, session_id);
|
||||
if !self.host_to_sessions.contains_key(&host_id) {
|
||||
ctx.emit(RemoteServerManagerEvent::HostDisconnected {
|
||||
host_id: host_id.clone(),
|
||||
});
|
||||
}
|
||||
|
||||
// Clear last navigated path so navigate_to_directory
|
||||
// re-fires after reconnect.
|
||||
self.last_navigated_path.remove(&session_id);
|
||||
|
||||
self.attempt_reconnect(
|
||||
session_id,
|
||||
1,
|
||||
host_id,
|
||||
exit_status,
|
||||
transport,
|
||||
control_path,
|
||||
ctx,
|
||||
);
|
||||
} else {
|
||||
// No reconnect — fall through to terminal Disconnected.
|
||||
self.sessions
|
||||
.insert(session_id, RemoteSessionState::Disconnected);
|
||||
self.remove_from_host_index(&host_id, session_id);
|
||||
ctx.emit(RemoteServerManagerEvent::SessionDisconnected {
|
||||
session_id,
|
||||
host_id: host_id.clone(),
|
||||
exit_status,
|
||||
});
|
||||
if !self.host_to_sessions.contains_key(&host_id) {
|
||||
ctx.emit(RemoteServerManagerEvent::HostDisconnected { host_id });
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Non-Connected states (Initializing, Connecting, etc.) —
|
||||
// no reconnect, just mark disconnected.
|
||||
self.sessions
|
||||
.insert(session_id, RemoteSessionState::Disconnected);
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempt to re-establish the remote server connection.
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
fn attempt_reconnect(
|
||||
&mut self,
|
||||
session_id: SessionId,
|
||||
attempt: u32,
|
||||
host_id: HostId,
|
||||
exit_status: Option<RemoteServerExitStatus>,
|
||||
transport: Arc<dyn RemoteTransport>,
|
||||
control_path: Option<PathBuf>,
|
||||
ctx: &mut ModelContext<Self>,
|
||||
) {
|
||||
log::info!(
|
||||
"Attempting reconnect for session {session_id:?} \
|
||||
(attempt {attempt}/{MAX_RECONNECT_ATTEMPTS})"
|
||||
);
|
||||
|
||||
self.sessions.insert(
|
||||
session_id,
|
||||
RemoteSessionState::Reconnecting {
|
||||
attempt,
|
||||
host_id: host_id.clone(),
|
||||
control_path: control_path.clone(),
|
||||
},
|
||||
);
|
||||
|
||||
let spawner = self.spawner.clone();
|
||||
let executor = ctx.background_executor().clone();
|
||||
let transport_clone = Arc::clone(&transport);
|
||||
|
||||
ctx.background_executor()
|
||||
.spawn(async move {
|
||||
async_io::Timer::after(RECONNECT_DELAY).await;
|
||||
|
||||
// Check if the session was deregistered during the delay.
|
||||
// (Checked via spawner since sessions lives on the main thread.)
|
||||
let was_removed = spawner
|
||||
.spawn(move |me, _ctx| !me.sessions.contains_key(&session_id))
|
||||
.await
|
||||
.unwrap_or(true);
|
||||
if was_removed {
|
||||
log::info!("Session {session_id:?} removed during reconnect delay, aborting");
|
||||
return;
|
||||
}
|
||||
|
||||
match Self::run_connect_and_handshake(
|
||||
session_id,
|
||||
&*transport_clone,
|
||||
&spawner,
|
||||
&executor,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(new_host_id) => {
|
||||
let _ = spawner
|
||||
.spawn(move |me, ctx| {
|
||||
me.mark_session_connected(
|
||||
session_id,
|
||||
new_host_id.clone(),
|
||||
Some(transport),
|
||||
ctx,
|
||||
);
|
||||
if let Some(client) = me.client_for_session(session_id).cloned() {
|
||||
ctx.emit(RemoteServerManagerEvent::SessionReconnected {
|
||||
session_id,
|
||||
host_id: new_host_id,
|
||||
attempt,
|
||||
client,
|
||||
});
|
||||
}
|
||||
})
|
||||
.await;
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!(
|
||||
"Reconnect failed for session {session_id:?} \
|
||||
(attempt {attempt}): {e:#}"
|
||||
);
|
||||
let _ = spawner
|
||||
.spawn(move |me, ctx| {
|
||||
me.handle_reconnect_failure(
|
||||
session_id,
|
||||
attempt,
|
||||
host_id,
|
||||
exit_status,
|
||||
transport,
|
||||
control_path,
|
||||
ctx,
|
||||
);
|
||||
})
|
||||
.await;
|
||||
}
|
||||
}
|
||||
})
|
||||
.detach();
|
||||
}
|
||||
|
||||
/// Handle a failed reconnection attempt: either retry or give up.
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
fn handle_reconnect_failure(
|
||||
&mut self,
|
||||
session_id: SessionId,
|
||||
attempt: u32,
|
||||
host_id: HostId,
|
||||
exit_status: Option<RemoteServerExitStatus>,
|
||||
transport: Arc<dyn RemoteTransport>,
|
||||
control_path: Option<PathBuf>,
|
||||
ctx: &mut ModelContext<Self>,
|
||||
) {
|
||||
if attempt < MAX_RECONNECT_ATTEMPTS {
|
||||
self.attempt_reconnect(
|
||||
session_id,
|
||||
attempt + 1,
|
||||
host_id,
|
||||
exit_status,
|
||||
transport,
|
||||
control_path,
|
||||
ctx,
|
||||
);
|
||||
} else {
|
||||
log::warn!("Reconnect exhausted for session {session_id:?} after {attempt} attempt(s)");
|
||||
self.sessions
|
||||
.insert(session_id, RemoteSessionState::Disconnected);
|
||||
ctx.emit(RemoteServerManagerEvent::SessionDisconnected {
|
||||
session_id,
|
||||
host_id: host_id.clone(),
|
||||
exit_status,
|
||||
});
|
||||
if !self.host_to_sessions.contains_key(&host_id) {
|
||||
ctx.emit(RemoteServerManagerEvent::HostDisconnected { host_id });
|
||||
|
||||
@@ -6,13 +6,13 @@
|
||||
//! in-process for tests) implement the same trait without touching the
|
||||
//! manager.
|
||||
//!
|
||||
//! Methods are async. Callers use the trait via generics
|
||||
//! (`T: RemoteTransport`) rather than `dyn` dispatch.
|
||||
//! Returns boxed futures for object safety — the manager stores
|
||||
//! `Arc<dyn RemoteTransport>` for reconnection.
|
||||
//!
|
||||
//! [`RemoteServerManager`]: crate::manager::RemoteServerManager
|
||||
use std::future::Future;
|
||||
#[cfg(not(target_family = "wasm"))]
|
||||
use std::path::PathBuf;
|
||||
use std::pin::Pin;
|
||||
|
||||
use async_channel::Receiver;
|
||||
use warpui::r#async::executor;
|
||||
@@ -56,12 +56,18 @@ pub struct Connection {
|
||||
pub control_path: Option<PathBuf>,
|
||||
}
|
||||
|
||||
pub trait RemoteTransport: Send + Sync {
|
||||
/// Transport abstraction for remote server connections.
|
||||
///
|
||||
/// Object-safe: returns boxed futures so implementations can be stored
|
||||
/// as `Arc<dyn RemoteTransport>` for reconnection.
|
||||
pub trait RemoteTransport: Send + Sync + std::fmt::Debug {
|
||||
/// Detects the remote host's OS and architecture by running `uname -sm`.
|
||||
///
|
||||
/// Returns the parsed [`RemotePlatform`] on success, or an error string
|
||||
/// if the command fails or the output cannot be parsed.
|
||||
fn detect_platform(&self) -> impl Future<Output = Result<RemotePlatform, String>> + Send;
|
||||
fn detect_platform(
|
||||
&self,
|
||||
) -> Pin<Box<dyn std::future::Future<Output = Result<RemotePlatform, String>> + Send>>;
|
||||
|
||||
/// Checks whether the remote server binary is present on the remote host.
|
||||
///
|
||||
@@ -72,7 +78,9 @@ pub trait RemoteTransport: Send + Sync {
|
||||
/// Returns `Ok(true)` if the binary is installed and executable,
|
||||
/// `Ok(false)` if it is definitively not installed, and
|
||||
/// `Err(_)` if the check failed (e.g. SSH timeout/unreachable).
|
||||
fn check_binary(&self) -> impl Future<Output = Result<bool, String>> + Send;
|
||||
fn check_binary(
|
||||
&self,
|
||||
) -> Pin<Box<dyn std::future::Future<Output = Result<bool, String>> + Send>>;
|
||||
|
||||
/// Installs the remote server binary on the remote host.
|
||||
///
|
||||
@@ -82,7 +90,9 @@ pub trait RemoteTransport: Send + Sync {
|
||||
///
|
||||
/// Returns `Ok(())` if the install succeeded, and
|
||||
/// `Err(_)` if the install failed (e.g. SSH timeout, script error).
|
||||
fn install_binary(&self) -> impl Future<Output = Result<(), String>> + Send;
|
||||
fn install_binary(
|
||||
&self,
|
||||
) -> Pin<Box<dyn std::future::Future<Output = Result<(), String>> + Send>>;
|
||||
|
||||
/// Establish a new connection to the remote server.
|
||||
///
|
||||
@@ -96,6 +106,6 @@ pub trait RemoteTransport: Send + Sync {
|
||||
/// a socket). Stderr forwarding to local logging should also happen here.
|
||||
fn connect(
|
||||
&self,
|
||||
executor: &executor::Background,
|
||||
) -> impl Future<Output = anyhow::Result<Connection>> + Send;
|
||||
executor: std::sync::Arc<executor::Background>,
|
||||
) -> Pin<Box<dyn std::future::Future<Output = anyhow::Result<Connection>> + Send>>;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user