mirror of
https://github.com/rustfs/rustfs.git
synced 2026-05-06 14:12:29 +08:00
fix(replication): clean targets when deleting config (#2599)
This commit is contained in:
@@ -190,6 +190,14 @@ async fn put_bucket_replication(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete_bucket_replication(
|
||||
env: &RustFSTestEnvironment,
|
||||
bucket: &str,
|
||||
) -> Result<reqwest::Response, Box<dyn Error + Send + Sync>> {
|
||||
let url = format!("{}/{bucket}?replication", env.url);
|
||||
signed_request(http::Method::DELETE, &url, &env.access_key, &env.secret_key, None, None).await
|
||||
}
|
||||
|
||||
async fn enable_bucket_versioning(env: &RustFSTestEnvironment, bucket: &str) -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
let client = env.create_s3_client();
|
||||
client
|
||||
@@ -1084,6 +1092,54 @@ async fn test_remove_remote_target_rejects_target_used_by_replication() -> Resul
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_delete_bucket_replication_removes_remote_target() -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
init_logging();
|
||||
|
||||
let mut source_env = RustFSTestEnvironment::new().await?;
|
||||
source_env.start_rustfs_server(vec![]).await?;
|
||||
|
||||
let mut target_env = RustFSTestEnvironment::new().await?;
|
||||
target_env.start_rustfs_server_without_cleanup(vec![]).await?;
|
||||
|
||||
let source_bucket = "replication-delete-config-src";
|
||||
let target_bucket = "replication-delete-config-dst";
|
||||
|
||||
let source_client = source_env.create_s3_client();
|
||||
let target_client = target_env.create_s3_client();
|
||||
|
||||
source_client.create_bucket().bucket(source_bucket).send().await?;
|
||||
target_client.create_bucket().bucket(target_bucket).send().await?;
|
||||
enable_bucket_versioning(&source_env, source_bucket).await?;
|
||||
enable_bucket_versioning(&target_env, target_bucket).await?;
|
||||
|
||||
let target_arn = set_replication_target(&source_env, source_bucket, &target_env, target_bucket).await?;
|
||||
put_bucket_replication(&source_env, source_bucket, &target_arn).await?;
|
||||
|
||||
let delete_response = delete_bucket_replication(&source_env, source_bucket).await?;
|
||||
assert!(
|
||||
delete_response.status().is_success(),
|
||||
"unexpected delete status: {}",
|
||||
delete_response.status()
|
||||
);
|
||||
|
||||
let targets_response = list_replication_targets_request(&source_env, Some(source_bucket)).await?;
|
||||
assert_eq!(targets_response.status(), StatusCode::OK);
|
||||
let targets: Vec<serde_json::Value> = targets_response.json().await?;
|
||||
assert!(
|
||||
targets
|
||||
.iter()
|
||||
.all(|target| target.get("arn").and_then(|arn| arn.as_str()) != Some(target_arn.as_str())),
|
||||
"deleted replication config left stale target {target_arn}: {targets:?}"
|
||||
);
|
||||
|
||||
let recreated_arn = set_replication_target(&source_env, source_bucket, &target_env, target_bucket).await?;
|
||||
put_bucket_replication(&source_env, source_bucket, &recreated_arn).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn test_site_replication_resync_start_cancel_restart_real_dual_node() -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
|
||||
@@ -34,17 +34,19 @@ use http::StatusCode;
|
||||
use metrics::counter;
|
||||
use rustfs_config::RUSTFS_REGION;
|
||||
use rustfs_ecstore::bucket::{
|
||||
bucket_target_sys::BucketTargetSys,
|
||||
lifecycle::bucket_lifecycle_ops::{
|
||||
enqueue_expiry_for_existing_objects, enqueue_transition_for_existing_objects, validate_transition_tier,
|
||||
},
|
||||
metadata::{
|
||||
BUCKET_CORS_CONFIG, BUCKET_LIFECYCLE_CONFIG, BUCKET_NOTIFICATION_CONFIG, BUCKET_POLICY_CONFIG,
|
||||
BUCKET_PUBLIC_ACCESS_BLOCK_CONFIG, BUCKET_REPLICATION_CONFIG, BUCKET_SSECONFIG, BUCKET_TAGGING_CONFIG,
|
||||
BUCKET_VERSIONING_CONFIG,
|
||||
BUCKET_TARGETS_FILE, BUCKET_VERSIONING_CONFIG,
|
||||
},
|
||||
metadata_sys,
|
||||
object_lock::ObjectLockApi,
|
||||
policy_sys::PolicySys,
|
||||
target::BucketTargetType,
|
||||
utils::serialize,
|
||||
versioning::VersioningApi,
|
||||
versioning_sys::BucketVersioningSys,
|
||||
@@ -99,6 +101,59 @@ fn sr_bucket_meta_item(bucket: String, item_type: &str) -> SRBucketMeta {
|
||||
}
|
||||
}
|
||||
|
||||
fn replication_target_arns(config: &ReplicationConfiguration) -> HashSet<String> {
|
||||
let mut arns = HashSet::new();
|
||||
|
||||
if !config.role.trim().is_empty() {
|
||||
arns.insert(config.role.clone());
|
||||
return arns;
|
||||
}
|
||||
|
||||
for rule in &config.rules {
|
||||
let arn = rule.destination.bucket.trim();
|
||||
if !arn.is_empty() {
|
||||
arns.insert(arn.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
arns
|
||||
}
|
||||
|
||||
async fn remove_replication_targets_for_config(bucket: &str, config: &ReplicationConfiguration) -> S3Result<()> {
|
||||
let target_arns = replication_target_arns(config);
|
||||
if target_arns.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut targets = match metadata_sys::get_bucket_targets_config(bucket).await {
|
||||
Ok(targets) => targets,
|
||||
Err(StorageError::ConfigNotFound) => {
|
||||
BucketTargetSys::get().update_all_targets(bucket, None).await;
|
||||
return Ok(());
|
||||
}
|
||||
Err(err) => return Err(ApiError::from(err).into()),
|
||||
};
|
||||
|
||||
let original_len = targets.targets.len();
|
||||
targets.targets.retain(|target| {
|
||||
target.target_type != BucketTargetType::ReplicationService || !target_arns.contains(target.arn.as_str())
|
||||
});
|
||||
|
||||
if targets.targets.len() == original_len {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let removed = original_len - targets.targets.len();
|
||||
let json_targets = serde_json::to_vec(&targets).map_err(to_internal_error)?;
|
||||
metadata_sys::update(bucket, BUCKET_TARGETS_FILE, json_targets)
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
BucketTargetSys::get().update_all_targets(bucket, Some(&targets)).await;
|
||||
info!(bucket = %bucket, removed, "removed replication remote targets referenced by deleted bucket replication config");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn versioning_configuration_has_object_lock_incompatible_settings(config: &VersioningConfiguration) -> bool {
|
||||
config.suspended()
|
||||
|| config.exclude_folders.unwrap_or(false)
|
||||
@@ -840,16 +895,26 @@ impl DefaultBucketUsecase {
|
||||
.get_bucket_info(&bucket, &BucketOptions::default())
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
let replication_config = match metadata_sys::get_replication_config(&bucket).await {
|
||||
Ok((config, _)) => Some(config),
|
||||
Err(StorageError::ConfigNotFound) => None,
|
||||
Err(err) => return Err(ApiError::from(err).into()),
|
||||
};
|
||||
|
||||
metadata_sys::delete(&bucket, BUCKET_REPLICATION_CONFIG)
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
if let Some(config) = replication_config.as_ref()
|
||||
&& let Err(err) = remove_replication_targets_for_config(&bucket, config).await
|
||||
{
|
||||
warn!(bucket = %bucket, error = ?err, "failed to remove replication targets referenced by deleted bucket replication config");
|
||||
}
|
||||
|
||||
let item = sr_bucket_meta_item(bucket.clone(), "replication-config");
|
||||
if let Err(err) = site_replication_bucket_meta_hook(item).await {
|
||||
warn!(bucket = %bucket, error = ?err, "site replication bucket replication-config delete hook failed");
|
||||
}
|
||||
|
||||
// TODO: remove targets
|
||||
info!(bucket = %bucket, "deleted bucket replication config");
|
||||
|
||||
Ok(S3Response::new(DeleteBucketReplicationOutput::default()))
|
||||
@@ -1868,6 +1933,52 @@ mod tests {
|
||||
req
|
||||
}
|
||||
|
||||
fn replication_rule_for_target(arn: &str) -> ReplicationRule {
|
||||
ReplicationRule {
|
||||
delete_marker_replication: None,
|
||||
delete_replication: None,
|
||||
destination: Destination {
|
||||
bucket: arn.to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
existing_object_replication: None,
|
||||
filter: None,
|
||||
id: Some("rule-1".to_string()),
|
||||
prefix: None,
|
||||
priority: Some(1),
|
||||
source_selection_criteria: None,
|
||||
status: ReplicationRuleStatus::from_static(ReplicationRuleStatus::ENABLED),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn replication_target_arns_use_role_when_present() {
|
||||
let role = "arn:rustfs:replication:us-east-1:source:bucket";
|
||||
let destination = "arn:rustfs:replication:us-east-1:target:bucket";
|
||||
let config = ReplicationConfiguration {
|
||||
role: role.to_string(),
|
||||
rules: vec![replication_rule_for_target(destination)],
|
||||
};
|
||||
|
||||
let arns = replication_target_arns(&config);
|
||||
|
||||
assert!(arns.contains(role));
|
||||
assert!(!arns.contains(destination));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn replication_target_arns_use_rule_destinations_without_role() {
|
||||
let destination = "arn:rustfs:replication:us-east-1:target:bucket";
|
||||
let config = ReplicationConfiguration {
|
||||
role: String::new(),
|
||||
rules: vec![replication_rule_for_target(destination)],
|
||||
};
|
||||
|
||||
let arns = replication_target_arns(&config);
|
||||
|
||||
assert!(arns.contains(destination));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn versioning_configuration_has_object_lock_incompatible_settings_rejects_suspended() {
|
||||
let config = VersioningConfiguration {
|
||||
|
||||
Reference in New Issue
Block a user