From deb1dbedbb3a2bf2a485fdee33b27efd9d0f4652 Mon Sep 17 00:00:00 2001 From: yxrxy Date: Tue, 24 Feb 2026 20:49:57 +0800 Subject: [PATCH] fix(ftps): Fix basename extraction and implement recursive delete (#1920) Co-authored-by: houseme Co-authored-by: heihutu <30542132+heihutu@users.noreply.github.com> --- crates/protocols/src/ftps/driver.rs | 160 +++++++++++++++++++++++++--- 1 file changed, 143 insertions(+), 17 deletions(-) diff --git a/crates/protocols/src/ftps/driver.rs b/crates/protocols/src/ftps/driver.rs index c965172d8..a45982085 100644 --- a/crates/protocols/src/ftps/driver.rs +++ b/crates/protocols/src/ftps/driver.rs @@ -141,6 +141,80 @@ where Ok((bucket, key)) } + + /// Recursively delete all objects in a bucket, then delete the bucket itself. + async fn delete_bucket_recursively( + &self, + bucket: &str, + session_context: &crate::common::session::SessionContext, + ) -> Result<()> { + // First, delete all objects in the bucket (with pagination) + let mut continuation_token = None; + loop { + let mut list_input = ListObjectsV2Input::builder().bucket(bucket.to_string()); + + if let Some(token) = continuation_token { + list_input = list_input.continuation_token(token); + } + + let list_input = list_input.build().map_err(|e| { + Error::new(ErrorKind::PermanentFileNotAvailable, format!("Failed to build ListObjectsV2Input: {}", e)) + })?; + + if let Ok(output) = self + .storage + .list_objects_v2( + list_input, + &session_context.principal.user_identity.credentials.access_key, + &session_context.principal.user_identity.credentials.secret_key, + ) + .await + { + // Delete all objects in this page + if let Some(objects) = output.contents { + for obj in objects { + if let Some(obj_key) = obj.key { + let _ = self + .storage + .delete_object( + bucket, + &obj_key, + &session_context.principal.user_identity.credentials.access_key, + &session_context.principal.user_identity.credentials.secret_key, + ) + .await; + } + } + } + + // Check if there are more objects + if !output.is_truncated.unwrap_or(false) { + break; + } + continuation_token = Some(output.next_continuation_token); + } else { + break; + } + } + + // Then delete the bucket + match self + .storage + .delete_bucket( + bucket, + &session_context.principal.user_identity.credentials.access_key, + &session_context.principal.user_identity.credentials.secret_key, + ) + .await + { + Ok(_) => Ok(()), + Err(e) if e.to_string().contains("NoSuchBucket") => Ok(()), + Err(e) => { + error!("Failed to delete bucket '{}': {}", bucket, e); + Err(Error::new(ErrorKind::PermanentFileNotAvailable, format!("Delete bucket failed: {}", e))) + } + } + } } #[async_trait] @@ -240,9 +314,13 @@ where .await .map_err(|_| Error::new(ErrorKind::PermanentFileNotAvailable, "Access denied"))?; + let prefix_with_slash = prefix + .clone() + .map(|p| if p.ends_with('/') { p.to_string() } else { format!("{}/", p) }); + let list_input = ListObjectsV2Input::builder() .bucket(bucket) - .prefix(prefix.map(|p| p.to_string())) + .prefix(prefix_with_slash.clone()) .delimiter(Some("/".to_string())) .build() .map_err(|e| { @@ -265,7 +343,27 @@ where if let Some(objects) = output.contents { for obj in objects { if let Some(key) = obj.key { - let filename = PathBuf::from(key.as_str()); + // Filter: only show files directly in current directory + // Skip files in subdirectories (they should be accessed via cd) + let should_show = if prefix.is_none() { + // Root directory: only show files without "/" + !key.contains('/') + } else { + // Subdirectory: show files starting with prefix + key.starts_with(&prefix_with_slash.clone().unwrap_or_default()) + }; + + if !should_show { + continue; + } + + let filename = PathBuf::from(key.as_str()) + .file_name() + .ok_or_else(|| { + Error::new(ErrorKind::PermanentFileNotAvailable, format!("Invalid filename: {}", key)) + }) + .map(PathBuf::from)?; + let size = obj.size.unwrap_or(0) as u64; let modified = obj.last_modified.map(|dt: s3s::dto::Timestamp| { // Convert s3s Timestamp to SystemTime @@ -291,7 +389,13 @@ where if let Some(common_prefixes) = output.common_prefixes { for prefix in common_prefixes { if let Some(prefix_str) = prefix.prefix { - let dir_name = PathBuf::from(prefix_str.as_str().trim_end_matches('/')); + let dir_name = PathBuf::from(prefix_str.as_str().trim_end_matches('/')) + .file_name() + .ok_or_else(|| { + Error::new(ErrorKind::PermanentFileNotAvailable, format!("Invalid directory: {}", prefix_str)) + }) + .map(PathBuf::from)?; + let metadata = FtpsMetadata { size: 0, modified: Some(std::time::SystemTime::now()), @@ -455,6 +559,11 @@ where .map_err(|e| Error::new(ErrorKind::PermanentFileNotAvailable, format!("{}: {}", "Invalid path", e)))?; if let Some(key) = key { + // Authorize delete object + authorize_operation(session_context, &S3Action::DeleteObject, &bucket, Some(&key)) + .await + .map_err(|_| Error::new(ErrorKind::PermanentFileNotAvailable, "Access denied"))?; + // Delete file match self .storage @@ -473,8 +582,18 @@ where } } } else { - // Delete directory (bucket) - not supported in typical FTP - Err(Error::new(ErrorKind::PermanentFileNotAvailable, "Directory deletion not supported")) + // Delete directory (bucket) + // If path ends with '/', treat it as bucket deletion request + if path_str.ends_with('/') { + // Authorize delete bucket + authorize_operation(session_context, &S3Action::DeleteBucket, &bucket, None) + .await + .map_err(|_| Error::new(ErrorKind::PermanentFileNotAvailable, "Access denied"))?; + + self.delete_bucket_recursively(&bucket, session_context).await + } else { + Err(Error::new(ErrorKind::PermanentFileNotAvailable, "Directory deletion not supported")) + } } } @@ -516,23 +635,27 @@ where .parse_s3_path(&path_str) .map_err(|e| Error::new(ErrorKind::PermanentFileNotAvailable, format!("{}: {}", "Invalid path", e)))?; - // Delete bucket for directory - match self - .storage - .delete_bucket( - &bucket, - &session_context.principal.user_identity.credentials.access_key, - &session_context.principal.user_identity.credentials.secret_key, - ) + // Authorize delete bucket + authorize_operation(session_context, &S3Action::DeleteBucket, &bucket, None) .await - { + .map_err(|_| Error::new(ErrorKind::PermanentFileNotAvailable, "Access denied"))?; + + // Try to delete bucket recursively + match self.delete_bucket_recursively(&bucket, session_context).await { Ok(_) => { debug!("Successfully removed directory/bucket '{}'", path_str); Ok(()) } Err(e) => { - error!("Failed to remove directory/bucket '{}': {}", path_str, e); - Err(Error::new(ErrorKind::PermanentFileNotAvailable, format!("Rmdir failed: {}", e))) + // Check if error is NoSuchBucket - treat as success (idempotent) + let error_msg = e.to_string(); + if error_msg.contains("NoSuchBucket") || error_msg.contains("does not exist") { + debug!("Bucket '{}' already deleted", bucket); + Ok(()) + } else { + error!("Failed to remove directory/bucket '{}': {}", path_str, e); + Err(e) + } } } } @@ -568,6 +691,9 @@ where let to_str = to.as_ref().to_string_lossy(); debug!("FTPS rename request for user '{}' from '{}' to '{}'", user.username, from_str, to_str); - Err(Error::new(ErrorKind::PermanentFileNotAvailable, "Atomic rename not supported in S3")) + Err(Error::new( + ErrorKind::CommandNotImplemented, + "Rename operation not supported in S3 backend", + )) } }