From 177fe2ab44a8f54ff9dff4ec392f253097675660 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E6=AD=A3=E8=B6=85?= Date: Sun, 19 Apr 2026 19:33:01 +0800 Subject: [PATCH] fix(replication): clean targets when deleting config (#2599) --- .../src/replication_extension_test.rs | 56 +++++++++ rustfs/src/app/bucket_usecase.rs | 115 +++++++++++++++++- 2 files changed, 169 insertions(+), 2 deletions(-) diff --git a/crates/e2e_test/src/replication_extension_test.rs b/crates/e2e_test/src/replication_extension_test.rs index c41a82e64..493dece36 100644 --- a/crates/e2e_test/src/replication_extension_test.rs +++ b/crates/e2e_test/src/replication_extension_test.rs @@ -190,6 +190,14 @@ async fn put_bucket_replication( Ok(()) } +async fn delete_bucket_replication( + env: &RustFSTestEnvironment, + bucket: &str, +) -> Result> { + let url = format!("{}/{bucket}?replication", env.url); + signed_request(http::Method::DELETE, &url, &env.access_key, &env.secret_key, None, None).await +} + async fn enable_bucket_versioning(env: &RustFSTestEnvironment, bucket: &str) -> Result<(), Box> { let client = env.create_s3_client(); client @@ -1084,6 +1092,54 @@ async fn test_remove_remote_target_rejects_target_used_by_replication() -> Resul Ok(()) } +#[tokio::test] +#[serial] +async fn test_delete_bucket_replication_removes_remote_target() -> Result<(), Box> { + init_logging(); + + let mut source_env = RustFSTestEnvironment::new().await?; + source_env.start_rustfs_server(vec![]).await?; + + let mut target_env = RustFSTestEnvironment::new().await?; + target_env.start_rustfs_server_without_cleanup(vec![]).await?; + + let source_bucket = "replication-delete-config-src"; + let target_bucket = "replication-delete-config-dst"; + + let source_client = source_env.create_s3_client(); + let target_client = target_env.create_s3_client(); + + source_client.create_bucket().bucket(source_bucket).send().await?; + target_client.create_bucket().bucket(target_bucket).send().await?; + enable_bucket_versioning(&source_env, source_bucket).await?; + enable_bucket_versioning(&target_env, target_bucket).await?; + + let target_arn = set_replication_target(&source_env, source_bucket, &target_env, target_bucket).await?; + put_bucket_replication(&source_env, source_bucket, &target_arn).await?; + + let delete_response = delete_bucket_replication(&source_env, source_bucket).await?; + assert!( + delete_response.status().is_success(), + "unexpected delete status: {}", + delete_response.status() + ); + + let targets_response = list_replication_targets_request(&source_env, Some(source_bucket)).await?; + assert_eq!(targets_response.status(), StatusCode::OK); + let targets: Vec = targets_response.json().await?; + assert!( + targets + .iter() + .all(|target| target.get("arn").and_then(|arn| arn.as_str()) != Some(target_arn.as_str())), + "deleted replication config left stale target {target_arn}: {targets:?}" + ); + + let recreated_arn = set_replication_target(&source_env, source_bucket, &target_env, target_bucket).await?; + put_bucket_replication(&source_env, source_bucket, &recreated_arn).await?; + + Ok(()) +} + #[tokio::test] #[serial] async fn test_site_replication_resync_start_cancel_restart_real_dual_node() -> Result<(), Box> { diff --git a/rustfs/src/app/bucket_usecase.rs b/rustfs/src/app/bucket_usecase.rs index 92e58d689..8753007aa 100644 --- a/rustfs/src/app/bucket_usecase.rs +++ b/rustfs/src/app/bucket_usecase.rs @@ -34,17 +34,19 @@ use http::StatusCode; use metrics::counter; use rustfs_config::RUSTFS_REGION; use rustfs_ecstore::bucket::{ + bucket_target_sys::BucketTargetSys, lifecycle::bucket_lifecycle_ops::{ enqueue_expiry_for_existing_objects, enqueue_transition_for_existing_objects, validate_transition_tier, }, metadata::{ BUCKET_CORS_CONFIG, BUCKET_LIFECYCLE_CONFIG, BUCKET_NOTIFICATION_CONFIG, BUCKET_POLICY_CONFIG, BUCKET_PUBLIC_ACCESS_BLOCK_CONFIG, BUCKET_REPLICATION_CONFIG, BUCKET_SSECONFIG, BUCKET_TAGGING_CONFIG, - BUCKET_VERSIONING_CONFIG, + BUCKET_TARGETS_FILE, BUCKET_VERSIONING_CONFIG, }, metadata_sys, object_lock::ObjectLockApi, policy_sys::PolicySys, + target::BucketTargetType, utils::serialize, versioning::VersioningApi, versioning_sys::BucketVersioningSys, @@ -99,6 +101,59 @@ fn sr_bucket_meta_item(bucket: String, item_type: &str) -> SRBucketMeta { } } +fn replication_target_arns(config: &ReplicationConfiguration) -> HashSet { + let mut arns = HashSet::new(); + + if !config.role.trim().is_empty() { + arns.insert(config.role.clone()); + return arns; + } + + for rule in &config.rules { + let arn = rule.destination.bucket.trim(); + if !arn.is_empty() { + arns.insert(arn.to_string()); + } + } + + arns +} + +async fn remove_replication_targets_for_config(bucket: &str, config: &ReplicationConfiguration) -> S3Result<()> { + let target_arns = replication_target_arns(config); + if target_arns.is_empty() { + return Ok(()); + } + + let mut targets = match metadata_sys::get_bucket_targets_config(bucket).await { + Ok(targets) => targets, + Err(StorageError::ConfigNotFound) => { + BucketTargetSys::get().update_all_targets(bucket, None).await; + return Ok(()); + } + Err(err) => return Err(ApiError::from(err).into()), + }; + + let original_len = targets.targets.len(); + targets.targets.retain(|target| { + target.target_type != BucketTargetType::ReplicationService || !target_arns.contains(target.arn.as_str()) + }); + + if targets.targets.len() == original_len { + return Ok(()); + } + + let removed = original_len - targets.targets.len(); + let json_targets = serde_json::to_vec(&targets).map_err(to_internal_error)?; + metadata_sys::update(bucket, BUCKET_TARGETS_FILE, json_targets) + .await + .map_err(ApiError::from)?; + BucketTargetSys::get().update_all_targets(bucket, Some(&targets)).await; + info!(bucket = %bucket, removed, "removed replication remote targets referenced by deleted bucket replication config"); + + Ok(()) +} + fn versioning_configuration_has_object_lock_incompatible_settings(config: &VersioningConfiguration) -> bool { config.suspended() || config.exclude_folders.unwrap_or(false) @@ -840,16 +895,26 @@ impl DefaultBucketUsecase { .get_bucket_info(&bucket, &BucketOptions::default()) .await .map_err(ApiError::from)?; + let replication_config = match metadata_sys::get_replication_config(&bucket).await { + Ok((config, _)) => Some(config), + Err(StorageError::ConfigNotFound) => None, + Err(err) => return Err(ApiError::from(err).into()), + }; + metadata_sys::delete(&bucket, BUCKET_REPLICATION_CONFIG) .await .map_err(ApiError::from)?; + if let Some(config) = replication_config.as_ref() + && let Err(err) = remove_replication_targets_for_config(&bucket, config).await + { + warn!(bucket = %bucket, error = ?err, "failed to remove replication targets referenced by deleted bucket replication config"); + } let item = sr_bucket_meta_item(bucket.clone(), "replication-config"); if let Err(err) = site_replication_bucket_meta_hook(item).await { warn!(bucket = %bucket, error = ?err, "site replication bucket replication-config delete hook failed"); } - // TODO: remove targets info!(bucket = %bucket, "deleted bucket replication config"); Ok(S3Response::new(DeleteBucketReplicationOutput::default())) @@ -1868,6 +1933,52 @@ mod tests { req } + fn replication_rule_for_target(arn: &str) -> ReplicationRule { + ReplicationRule { + delete_marker_replication: None, + delete_replication: None, + destination: Destination { + bucket: arn.to_string(), + ..Default::default() + }, + existing_object_replication: None, + filter: None, + id: Some("rule-1".to_string()), + prefix: None, + priority: Some(1), + source_selection_criteria: None, + status: ReplicationRuleStatus::from_static(ReplicationRuleStatus::ENABLED), + } + } + + #[test] + fn replication_target_arns_use_role_when_present() { + let role = "arn:rustfs:replication:us-east-1:source:bucket"; + let destination = "arn:rustfs:replication:us-east-1:target:bucket"; + let config = ReplicationConfiguration { + role: role.to_string(), + rules: vec![replication_rule_for_target(destination)], + }; + + let arns = replication_target_arns(&config); + + assert!(arns.contains(role)); + assert!(!arns.contains(destination)); + } + + #[test] + fn replication_target_arns_use_rule_destinations_without_role() { + let destination = "arn:rustfs:replication:us-east-1:target:bucket"; + let config = ReplicationConfiguration { + role: String::new(), + rules: vec![replication_rule_for_target(destination)], + }; + + let arns = replication_target_arns(&config); + + assert!(arns.contains(destination)); + } + #[test] fn versioning_configuration_has_object_lock_incompatible_settings_rejects_suspended() { let config = VersioningConfiguration {