mirror of
https://github.com/rustfs/rustfs.git
synced 2026-05-06 22:28:16 +08:00
fix: prevent object lock retention race (#2634)
This commit is contained in:
@@ -18,6 +18,7 @@
|
||||
use crate::batch_processor::{AsyncBatchProcessor, get_global_processors};
|
||||
use crate::bitrot::{create_bitrot_reader, create_bitrot_writer};
|
||||
use crate::bucket::lifecycle::lifecycle::TRANSITION_COMPLETE;
|
||||
use crate::bucket::object_lock::objectlock_sys::check_retention_for_modification;
|
||||
use crate::bucket::replication::check_replicate_delete;
|
||||
use crate::bucket::versioning::VersioningApi;
|
||||
use crate::bucket::versioning_sys::BucketVersioningSys;
|
||||
@@ -1343,6 +1344,22 @@ impl BucketOperations for SetDisks {
|
||||
}
|
||||
}
|
||||
|
||||
fn check_object_lock_retention_update(bucket: &str, object: &str, obj_info: &ObjectInfo, opts: &ObjectOptions) -> Result<()> {
|
||||
if let Some(retention) = &opts.object_lock_retention
|
||||
&& check_retention_for_modification(
|
||||
&obj_info.user_defined,
|
||||
retention.mode.as_deref(),
|
||||
retention.retain_until,
|
||||
retention.bypass_governance,
|
||||
)
|
||||
.is_some()
|
||||
{
|
||||
return Err(StorageError::PrefixAccessDenied(bucket.to_string(), object.to_string()));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ObjectOperations for SetDisks {
|
||||
#[tracing::instrument(skip(self))]
|
||||
@@ -1989,6 +2006,8 @@ impl ObjectOperations for SetDisks {
|
||||
|
||||
let obj_info = ObjectInfo::from_file_info(&fi, bucket, object, opts.versioned || opts.version_suspended);
|
||||
|
||||
check_object_lock_retention_update(bucket, object, &obj_info, opts)?;
|
||||
|
||||
for (k, v) in obj_info.user_defined {
|
||||
fi.metadata.insert(k, v);
|
||||
}
|
||||
@@ -5483,6 +5502,74 @@ mod tests {
|
||||
assert!(rendered.contains("object"), "{rendered}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_check_object_lock_retention_update_blocks_compliance_shorten() {
|
||||
let now = OffsetDateTime::now_utc();
|
||||
let existing_until = now + Duration::from_secs(60 * 60 * 24 * 60);
|
||||
let requested_until = now + Duration::from_secs(60 * 60 * 24);
|
||||
|
||||
let mut user_defined = HashMap::new();
|
||||
user_defined.insert(
|
||||
X_AMZ_OBJECT_LOCK_MODE.as_str().to_string(),
|
||||
s3s::dto::ObjectLockRetentionMode::COMPLIANCE.to_string(),
|
||||
);
|
||||
user_defined.insert(
|
||||
X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE.as_str().to_string(),
|
||||
existing_until.format(&time::format_description::well_known::Rfc3339).unwrap(),
|
||||
);
|
||||
|
||||
let obj_info = ObjectInfo {
|
||||
user_defined,
|
||||
..Default::default()
|
||||
};
|
||||
let opts = ObjectOptions {
|
||||
object_lock_retention: Some(crate::store_api::ObjectLockRetentionOptions {
|
||||
mode: Some(s3s::dto::ObjectLockRetentionMode::COMPLIANCE.to_string()),
|
||||
retain_until: Some(requested_until),
|
||||
bypass_governance: true,
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let err = check_object_lock_retention_update("bucket", "object", &obj_info, &opts)
|
||||
.expect_err("COMPLIANCE shortening must be blocked");
|
||||
|
||||
assert!(matches!(err, StorageError::PrefixAccessDenied(_, _)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_check_object_lock_retention_update_allows_governance_shorten_with_bypass() {
|
||||
let now = OffsetDateTime::now_utc();
|
||||
let existing_until = now + Duration::from_secs(60 * 60 * 24 * 60);
|
||||
let requested_until = now + Duration::from_secs(60 * 60 * 24);
|
||||
|
||||
let mut user_defined = HashMap::new();
|
||||
user_defined.insert(
|
||||
X_AMZ_OBJECT_LOCK_MODE.as_str().to_string(),
|
||||
s3s::dto::ObjectLockRetentionMode::GOVERNANCE.to_string(),
|
||||
);
|
||||
user_defined.insert(
|
||||
X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE.as_str().to_string(),
|
||||
existing_until.format(&time::format_description::well_known::Rfc3339).unwrap(),
|
||||
);
|
||||
|
||||
let obj_info = ObjectInfo {
|
||||
user_defined,
|
||||
..Default::default()
|
||||
};
|
||||
let opts = ObjectOptions {
|
||||
object_lock_retention: Some(crate::store_api::ObjectLockRetentionOptions {
|
||||
mode: Some(s3s::dto::ObjectLockRetentionMode::GOVERNANCE.to_string()),
|
||||
retain_until: Some(requested_until),
|
||||
bypass_governance: true,
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
check_object_lock_retention_update("bucket", "object", &obj_info, &opts)
|
||||
.expect("GOVERNANCE shortening with bypass should remain allowed");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_should_prevent_write() {
|
||||
let oi = ObjectInfo {
|
||||
|
||||
@@ -43,6 +43,13 @@ impl HTTPPreconditions {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct ObjectLockRetentionOptions {
|
||||
pub mode: Option<String>,
|
||||
pub retain_until: Option<OffsetDateTime>,
|
||||
pub bypass_governance: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct ObjectOptions {
|
||||
// Use the maximum parity (N/2), used when saving server configuration files
|
||||
@@ -79,6 +86,7 @@ pub struct ObjectOptions {
|
||||
pub lifecycle_audit_event: LcAuditEvent,
|
||||
|
||||
pub eval_metadata: Option<HashMap<String, String>>,
|
||||
pub object_lock_retention: Option<ObjectLockRetentionOptions>,
|
||||
|
||||
pub want_checksum: Option<Checksum>,
|
||||
pub skip_verify_bitrot: bool,
|
||||
|
||||
@@ -38,7 +38,7 @@ use rustfs_ecstore::{
|
||||
},
|
||||
error::{StorageError, is_err_bucket_not_found, is_err_object_not_found, is_err_version_not_found},
|
||||
new_object_layer_fn,
|
||||
store_api::{BucketOperations, BucketOptions, ObjectOperations, ObjectOptions},
|
||||
store_api::{BucketOperations, BucketOptions, ObjectLockRetentionOptions, ObjectOperations, ObjectOptions},
|
||||
};
|
||||
use rustfs_s3_common::{S3Operation, record_s3_op};
|
||||
use rustfs_targets::EventName;
|
||||
@@ -1284,44 +1284,27 @@ impl S3 for FS {
|
||||
.as_ref()
|
||||
.and_then(|r| r.retain_until_date.as_ref())
|
||||
.map(|d| OffsetDateTime::from(d.clone()));
|
||||
let new_mode = retention.as_ref().and_then(|r| r.mode.as_ref()).map(|mode| mode.as_str());
|
||||
let new_mode = retention
|
||||
.as_ref()
|
||||
.and_then(|r| r.mode.as_ref())
|
||||
.map(|mode| mode.as_str().to_string());
|
||||
|
||||
// TODO(security): Known TOCTOU race condition (fix in future PR).
|
||||
//
|
||||
// There is a time-of-check-time-of-use (TOCTOU) window between the retention
|
||||
// check below (using get_object_info + check_retention_for_modification) and
|
||||
// the actual update performed later in put_object_metadata.
|
||||
//
|
||||
// In theory:
|
||||
// * Thread A reads retention mode = GOVERNANCE and checks the bypass header.
|
||||
// * Thread B updates retention to COMPLIANCE mode.
|
||||
// * Thread A then proceeds to modify retention, still assuming GOVERNANCE,
|
||||
// and effectively bypasses what is now COMPLIANCE mode.
|
||||
//
|
||||
// This would violate the S3 spec, which states that COMPLIANCE-mode retention
|
||||
// cannot be modified even with a bypass header.
|
||||
//
|
||||
// Possible fixes (to be implemented in a future change):
|
||||
// 1. Pass the expected retention mode down to the storage layer and verify
|
||||
// it has not changed immediately before the update.
|
||||
// 2. Use optimistic concurrency (e.g., version/etag) so that the update
|
||||
// fails if the object changed between check and update.
|
||||
// 3. Perform the retention check inside the same lock/transaction scope as
|
||||
// the metadata update within the storage layer.
|
||||
//
|
||||
// Current mitigation: the storage layer provides a fast_lock_manager, which
|
||||
// offers some protection, but it does not fully eliminate this race.
|
||||
let bypass_governance = has_bypass_governance_header(&req.headers);
|
||||
// Keep the early check for existing response behavior; put_object_metadata
|
||||
// repeats the same check after taking the metadata write lock.
|
||||
let check_opts: ObjectOptions = get_opts(&bucket, &key, version_id.clone(), None, &req.headers)
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
|
||||
if let Ok(existing_obj_info) = store.get_object_info(&bucket, &key, &check_opts).await {
|
||||
let bypass_governance = has_bypass_governance_header(&req.headers);
|
||||
if let Some(block_reason) =
|
||||
check_retention_for_modification(&existing_obj_info.user_defined, new_mode, new_retain_until, bypass_governance)
|
||||
{
|
||||
return Err(S3Error::with_message(S3ErrorCode::AccessDenied, block_reason.error_message()));
|
||||
}
|
||||
if let Ok(existing_obj_info) = store.get_object_info(&bucket, &key, &check_opts).await
|
||||
&& let Some(block_reason) = check_retention_for_modification(
|
||||
&existing_obj_info.user_defined,
|
||||
new_mode.as_deref(),
|
||||
new_retain_until,
|
||||
bypass_governance,
|
||||
)
|
||||
{
|
||||
return Err(S3Error::with_message(S3ErrorCode::AccessDenied, block_reason.error_message()));
|
||||
}
|
||||
|
||||
let eval_metadata = parse_object_lock_retention(retention)?;
|
||||
@@ -1330,10 +1313,15 @@ impl S3 for FS {
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
opts.eval_metadata = Some(eval_metadata);
|
||||
opts.object_lock_retention = Some(ObjectLockRetentionOptions {
|
||||
mode: new_mode,
|
||||
retain_until: new_retain_until,
|
||||
bypass_governance,
|
||||
});
|
||||
|
||||
let object_info = store.put_object_metadata(&bucket, &key, &opts).await.map_err(|e| {
|
||||
error!("put_object_metadata failed, {}", e.to_string());
|
||||
s3_error!(InternalError, "{}", e.to_string())
|
||||
S3Error::from(ApiError::from(e))
|
||||
})?;
|
||||
|
||||
let output = PutObjectRetentionOutput {
|
||||
|
||||
Reference in New Issue
Block a user