mirror of
https://github.com/rustfs/rustfs.git
synced 2026-05-07 23:27:50 +08:00
refactor(storage): inline object lock metadata writes
This commit is contained in:
@@ -58,9 +58,7 @@ use rustfs_ecstore::bucket::{
|
||||
metadata_sys,
|
||||
object_lock::{
|
||||
objectlock::{get_object_legalhold_meta, get_object_retention_meta},
|
||||
objectlock_sys::{
|
||||
BucketObjectLockSys, check_object_lock_for_deletion, check_retention_for_modification, is_retention_active,
|
||||
},
|
||||
objectlock_sys::{BucketObjectLockSys, check_object_lock_for_deletion, is_retention_active},
|
||||
},
|
||||
quota::QuotaOperation,
|
||||
replication::{
|
||||
@@ -792,60 +790,6 @@ impl DefaultObjectUsecase {
|
||||
result
|
||||
}
|
||||
|
||||
pub async fn execute_put_object_legal_hold(
|
||||
&self,
|
||||
req: S3Request<PutObjectLegalHoldInput>,
|
||||
) -> S3Result<S3Response<PutObjectLegalHoldOutput>> {
|
||||
let mut helper =
|
||||
OperationHelper::new(&req, EventName::ObjectCreatedPutLegalHold, S3Operation::PutObjectLegalHold).suppress_event();
|
||||
let PutObjectLegalHoldInput {
|
||||
bucket,
|
||||
key,
|
||||
legal_hold,
|
||||
version_id,
|
||||
..
|
||||
} = req.input.clone();
|
||||
|
||||
let Some(store) = new_object_layer_fn() else {
|
||||
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
|
||||
};
|
||||
|
||||
let _ = store
|
||||
.get_bucket_info(&bucket, &BucketOptions::default())
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
|
||||
validate_bucket_object_lock_enabled(&bucket).await?;
|
||||
|
||||
let opts: ObjectOptions = get_opts(&bucket, &key, version_id, None, &req.headers)
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
|
||||
let eval_metadata = parse_object_lock_legal_hold(legal_hold)?;
|
||||
|
||||
let popts = ObjectOptions {
|
||||
mod_time: opts.mod_time,
|
||||
version_id: opts.version_id,
|
||||
eval_metadata: Some(eval_metadata),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let info = store.put_object_metadata(&bucket, &key, &popts).await.map_err(|e| {
|
||||
error!("put_object_metadata failed, {}", e.to_string());
|
||||
s3_error!(InternalError, "{}", e.to_string())
|
||||
})?;
|
||||
|
||||
let output = PutObjectLegalHoldOutput {
|
||||
request_charged: Some(RequestCharged::from_static(RequestCharged::REQUESTER)),
|
||||
};
|
||||
let version_id = req.input.version_id.clone().unwrap_or_default();
|
||||
helper = helper.object(info).version_id(version_id);
|
||||
|
||||
let result = Ok(S3Response::new(output));
|
||||
let _ = helper.complete(&result);
|
||||
result
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
pub async fn execute_put_object_lock_configuration(
|
||||
&self,
|
||||
@@ -916,94 +860,6 @@ impl DefaultObjectUsecase {
|
||||
Ok(S3Response::new(PutObjectLockConfigurationOutput::default()))
|
||||
}
|
||||
|
||||
pub async fn execute_put_object_retention(
|
||||
&self,
|
||||
req: S3Request<PutObjectRetentionInput>,
|
||||
) -> S3Result<S3Response<PutObjectRetentionOutput>> {
|
||||
let mut helper =
|
||||
OperationHelper::new(&req, EventName::ObjectCreatedPutRetention, S3Operation::PutObjectRetention).suppress_event();
|
||||
let PutObjectRetentionInput {
|
||||
bucket,
|
||||
key,
|
||||
retention,
|
||||
version_id,
|
||||
..
|
||||
} = req.input.clone();
|
||||
|
||||
let Some(store) = new_object_layer_fn() else {
|
||||
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
|
||||
};
|
||||
|
||||
validate_bucket_object_lock_enabled(&bucket).await?;
|
||||
|
||||
let new_retain_until = retention
|
||||
.as_ref()
|
||||
.and_then(|r| r.retain_until_date.as_ref())
|
||||
.map(|d| OffsetDateTime::from(d.clone()));
|
||||
let new_mode = retention.as_ref().and_then(|r| r.mode.as_ref()).map(|mode| mode.as_str());
|
||||
|
||||
// TODO(security): Known TOCTOU race condition (fix in future PR).
|
||||
//
|
||||
// There is a time-of-check-time-of-use (TOCTOU) window between the retention
|
||||
// check below (using get_object_info + check_retention_for_modification) and
|
||||
// the actual update performed later in put_object_metadata.
|
||||
//
|
||||
// In theory:
|
||||
// * Thread A reads retention mode = GOVERNANCE and checks the bypass header.
|
||||
// * Thread B updates retention to COMPLIANCE mode.
|
||||
// * Thread A then proceeds to modify retention, still assuming GOVERNANCE,
|
||||
// and effectively bypasses what is now COMPLIANCE mode.
|
||||
//
|
||||
// This would violate the S3 spec, which states that COMPLIANCE-mode retention
|
||||
// cannot be modified even with a bypass header.
|
||||
//
|
||||
// Possible fixes (to be implemented in a future change):
|
||||
// 1. Pass the expected retention mode down to the storage layer and verify
|
||||
// it has not changed immediately before the update.
|
||||
// 2. Use optimistic concurrency (e.g., version/etag) so that the update
|
||||
// fails if the object changed between check and update.
|
||||
// 3. Perform the retention check inside the same lock/transaction scope as
|
||||
// the metadata update within the storage layer.
|
||||
//
|
||||
// Current mitigation: the storage layer provides a fast_lock_manager, which
|
||||
// offers some protection, but it does not fully eliminate this race.
|
||||
let check_opts: ObjectOptions = get_opts(&bucket, &key, version_id.clone(), None, &req.headers)
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
|
||||
if let Ok(existing_obj_info) = store.get_object_info(&bucket, &key, &check_opts).await {
|
||||
let bypass_governance = has_bypass_governance_header(&req.headers);
|
||||
if let Some(block_reason) =
|
||||
check_retention_for_modification(&existing_obj_info.user_defined, new_mode, new_retain_until, bypass_governance)
|
||||
{
|
||||
return Err(S3Error::with_message(S3ErrorCode::AccessDenied, block_reason.error_message()));
|
||||
}
|
||||
}
|
||||
|
||||
let eval_metadata = parse_object_lock_retention(retention)?;
|
||||
|
||||
let mut opts: ObjectOptions = get_opts(&bucket, &key, version_id, None, &req.headers)
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
opts.eval_metadata = Some(eval_metadata);
|
||||
|
||||
let object_info = store.put_object_metadata(&bucket, &key, &opts).await.map_err(|e| {
|
||||
error!("put_object_metadata failed, {}", e.to_string());
|
||||
s3_error!(InternalError, "{}", e.to_string())
|
||||
})?;
|
||||
|
||||
let output = PutObjectRetentionOutput {
|
||||
request_charged: Some(RequestCharged::from_static(RequestCharged::REQUESTER)),
|
||||
};
|
||||
|
||||
let version_id = req.input.version_id.clone().unwrap_or_else(|| Uuid::new_v4().to_string());
|
||||
helper = helper.object(object_info).version_id(version_id);
|
||||
|
||||
let result = Ok(S3Response::new(output));
|
||||
let _ = helper.complete(&result);
|
||||
result
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
skip(self, req),
|
||||
@@ -3004,21 +2860,6 @@ mod tests {
|
||||
assert_eq!(err.code(), &S3ErrorCode::InternalError);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn execute_put_object_legal_hold_returns_internal_error_when_store_uninitialized() {
|
||||
let input = PutObjectLegalHoldInput::builder()
|
||||
.bucket("test-bucket".to_string())
|
||||
.key("test-key".to_string())
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let req = build_request(input, Method::PUT);
|
||||
let usecase = DefaultObjectUsecase::without_context();
|
||||
|
||||
let err = usecase.execute_put_object_legal_hold(req).await.unwrap_err();
|
||||
assert_eq!(err.code(), &S3ErrorCode::InternalError);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn execute_put_object_lock_configuration_returns_internal_error_when_store_uninitialized() {
|
||||
let input = PutObjectLockConfigurationInput::builder()
|
||||
@@ -3127,21 +2968,6 @@ mod tests {
|
||||
assert_eq!(err.code(), &S3ErrorCode::Custom("InvalidRetentionPeriod".into()));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn execute_put_object_retention_returns_internal_error_when_store_uninitialized() {
|
||||
let input = PutObjectRetentionInput::builder()
|
||||
.bucket("test-bucket".to_string())
|
||||
.key("test-key".to_string())
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let req = build_request(input, Method::PUT);
|
||||
let usecase = DefaultObjectUsecase::without_context();
|
||||
|
||||
let err = usecase.execute_put_object_retention(req).await.unwrap_err();
|
||||
assert_eq!(err.code(), &S3ErrorCode::InternalError);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn execute_head_object_rejects_range_with_part_number() {
|
||||
let input = HeadObjectInput::builder()
|
||||
|
||||
@@ -16,15 +16,17 @@ use crate::app::bucket_usecase::DefaultBucketUsecase;
|
||||
use crate::app::multipart_usecase::DefaultMultipartUsecase;
|
||||
use crate::app::object_usecase::DefaultObjectUsecase;
|
||||
use crate::error::ApiError;
|
||||
use crate::storage::access::has_bypass_governance_header;
|
||||
use crate::storage::helper::OperationHelper;
|
||||
use crate::storage::options::get_opts;
|
||||
use crate::storage::s3_api::acl;
|
||||
use crate::storage::validate_bucket_object_lock_enabled;
|
||||
use crate::storage::{parse_object_lock_legal_hold, parse_object_lock_retention, validate_bucket_object_lock_enabled};
|
||||
use metrics::{counter, histogram};
|
||||
use rustfs_ecstore::{
|
||||
bucket::{
|
||||
metadata::{BUCKET_ACCELERATE_CONFIG, BUCKET_LOGGING_CONFIG, BUCKET_REQUEST_PAYMENT_CONFIG, BUCKET_WEBSITE_CONFIG},
|
||||
metadata_sys,
|
||||
object_lock::objectlock_sys::check_retention_for_modification,
|
||||
tagging::{decode_tags, decode_tags_to_map, encode_tags},
|
||||
utils::serialize,
|
||||
},
|
||||
@@ -969,8 +971,54 @@ impl S3 for FS {
|
||||
&self,
|
||||
req: S3Request<PutObjectLegalHoldInput>,
|
||||
) -> S3Result<S3Response<PutObjectLegalHoldOutput>> {
|
||||
let usecase = DefaultObjectUsecase::from_global();
|
||||
usecase.execute_put_object_legal_hold(req).await
|
||||
let mut helper =
|
||||
OperationHelper::new(&req, EventName::ObjectCreatedPutLegalHold, S3Operation::PutObjectLegalHold).suppress_event();
|
||||
let PutObjectLegalHoldInput {
|
||||
bucket,
|
||||
key,
|
||||
legal_hold,
|
||||
version_id,
|
||||
..
|
||||
} = req.input.clone();
|
||||
|
||||
let Some(store) = new_object_layer_fn() else {
|
||||
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
|
||||
};
|
||||
|
||||
let _ = store
|
||||
.get_bucket_info(&bucket, &BucketOptions::default())
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
|
||||
validate_bucket_object_lock_enabled(&bucket).await?;
|
||||
|
||||
let opts: ObjectOptions = get_opts(&bucket, &key, version_id, None, &req.headers)
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
|
||||
let eval_metadata = parse_object_lock_legal_hold(legal_hold)?;
|
||||
|
||||
let popts = ObjectOptions {
|
||||
mod_time: opts.mod_time,
|
||||
version_id: opts.version_id,
|
||||
eval_metadata: Some(eval_metadata),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let info = store.put_object_metadata(&bucket, &key, &popts).await.map_err(|e| {
|
||||
error!("put_object_metadata failed, {}", e.to_string());
|
||||
s3_error!(InternalError, "{}", e.to_string())
|
||||
})?;
|
||||
|
||||
let output = PutObjectLegalHoldOutput {
|
||||
request_charged: Some(RequestCharged::from_static(RequestCharged::REQUESTER)),
|
||||
};
|
||||
let version_id = req.input.version_id.clone().unwrap_or_default();
|
||||
helper = helper.object(info).version_id(version_id);
|
||||
|
||||
let result = Ok(S3Response::new(output));
|
||||
let _ = helper.complete(&result);
|
||||
result
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
@@ -986,8 +1034,88 @@ impl S3 for FS {
|
||||
&self,
|
||||
req: S3Request<PutObjectRetentionInput>,
|
||||
) -> S3Result<S3Response<PutObjectRetentionOutput>> {
|
||||
let usecase = DefaultObjectUsecase::from_global();
|
||||
usecase.execute_put_object_retention(req).await
|
||||
let mut helper =
|
||||
OperationHelper::new(&req, EventName::ObjectCreatedPutRetention, S3Operation::PutObjectRetention).suppress_event();
|
||||
let PutObjectRetentionInput {
|
||||
bucket,
|
||||
key,
|
||||
retention,
|
||||
version_id,
|
||||
..
|
||||
} = req.input.clone();
|
||||
|
||||
let Some(store) = new_object_layer_fn() else {
|
||||
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
|
||||
};
|
||||
|
||||
validate_bucket_object_lock_enabled(&bucket).await?;
|
||||
|
||||
let new_retain_until = retention
|
||||
.as_ref()
|
||||
.and_then(|r| r.retain_until_date.as_ref())
|
||||
.map(|d| OffsetDateTime::from(d.clone()));
|
||||
let new_mode = retention.as_ref().and_then(|r| r.mode.as_ref()).map(|mode| mode.as_str());
|
||||
|
||||
// TODO(security): Known TOCTOU race condition (fix in future PR).
|
||||
//
|
||||
// There is a time-of-check-time-of-use (TOCTOU) window between the retention
|
||||
// check below (using get_object_info + check_retention_for_modification) and
|
||||
// the actual update performed later in put_object_metadata.
|
||||
//
|
||||
// In theory:
|
||||
// * Thread A reads retention mode = GOVERNANCE and checks the bypass header.
|
||||
// * Thread B updates retention to COMPLIANCE mode.
|
||||
// * Thread A then proceeds to modify retention, still assuming GOVERNANCE,
|
||||
// and effectively bypasses what is now COMPLIANCE mode.
|
||||
//
|
||||
// This would violate the S3 spec, which states that COMPLIANCE-mode retention
|
||||
// cannot be modified even with a bypass header.
|
||||
//
|
||||
// Possible fixes (to be implemented in a future change):
|
||||
// 1. Pass the expected retention mode down to the storage layer and verify
|
||||
// it has not changed immediately before the update.
|
||||
// 2. Use optimistic concurrency (e.g., version/etag) so that the update
|
||||
// fails if the object changed between check and update.
|
||||
// 3. Perform the retention check inside the same lock/transaction scope as
|
||||
// the metadata update within the storage layer.
|
||||
//
|
||||
// Current mitigation: the storage layer provides a fast_lock_manager, which
|
||||
// offers some protection, but it does not fully eliminate this race.
|
||||
let check_opts: ObjectOptions = get_opts(&bucket, &key, version_id.clone(), None, &req.headers)
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
|
||||
if let Ok(existing_obj_info) = store.get_object_info(&bucket, &key, &check_opts).await {
|
||||
let bypass_governance = has_bypass_governance_header(&req.headers);
|
||||
if let Some(block_reason) =
|
||||
check_retention_for_modification(&existing_obj_info.user_defined, new_mode, new_retain_until, bypass_governance)
|
||||
{
|
||||
return Err(S3Error::with_message(S3ErrorCode::AccessDenied, block_reason.error_message()));
|
||||
}
|
||||
}
|
||||
|
||||
let eval_metadata = parse_object_lock_retention(retention)?;
|
||||
|
||||
let mut opts: ObjectOptions = get_opts(&bucket, &key, version_id, None, &req.headers)
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
opts.eval_metadata = Some(eval_metadata);
|
||||
|
||||
let object_info = store.put_object_metadata(&bucket, &key, &opts).await.map_err(|e| {
|
||||
error!("put_object_metadata failed, {}", e.to_string());
|
||||
s3_error!(InternalError, "{}", e.to_string())
|
||||
})?;
|
||||
|
||||
let output = PutObjectRetentionOutput {
|
||||
request_charged: Some(RequestCharged::from_static(RequestCharged::REQUESTER)),
|
||||
};
|
||||
|
||||
let version_id = req.input.version_id.clone().unwrap_or_else(|| Uuid::new_v4().to_string());
|
||||
helper = helper.object(object_info).version_id(version_id);
|
||||
|
||||
let result = Ok(S3Response::new(output));
|
||||
let _ = helper.complete(&result);
|
||||
result
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self, req))]
|
||||
|
||||
@@ -37,8 +37,8 @@ mod tests {
|
||||
use s3s::dto::{
|
||||
CORSConfiguration, CORSRule, DeleteObjectTaggingInput, Delimiter, GetObjectAclInput, GetObjectLegalHoldInput,
|
||||
GetObjectRetentionInput, GetObjectTaggingInput, LambdaFunctionConfiguration, ObjectLockLegalHold,
|
||||
ObjectLockLegalHoldStatus, ObjectLockRetention, ObjectLockRetentionMode, PutObjectTaggingInput, QueueConfiguration, Tag,
|
||||
Tagging, TopicConfiguration,
|
||||
ObjectLockLegalHoldStatus, ObjectLockRetention, ObjectLockRetentionMode, PutObjectLegalHoldInput,
|
||||
PutObjectRetentionInput, PutObjectTaggingInput, QueueConfiguration, Tag, Tagging, TopicConfiguration,
|
||||
};
|
||||
use s3s::{S3, S3Error, S3ErrorCode, S3Request, s3_error};
|
||||
use time::OffsetDateTime;
|
||||
@@ -225,6 +225,32 @@ mod tests {
|
||||
assert_eq!(err.code(), &S3ErrorCode::InternalError);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_put_object_legal_hold_returns_internal_error_when_store_uninitialized() {
|
||||
let input = PutObjectLegalHoldInput::builder()
|
||||
.bucket("test-bucket".to_string())
|
||||
.key("test-key".to_string())
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let fs = FS::new();
|
||||
let err = fs.put_object_legal_hold(build_request(input, Method::PUT)).await.unwrap_err();
|
||||
assert_eq!(err.code(), &S3ErrorCode::InternalError);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_put_object_retention_returns_internal_error_when_store_uninitialized() {
|
||||
let input = PutObjectRetentionInput::builder()
|
||||
.bucket("test-bucket".to_string())
|
||||
.key("test-key".to_string())
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let fs = FS::new();
|
||||
let err = fs.put_object_retention(build_request(input, Method::PUT)).await.unwrap_err();
|
||||
assert_eq!(err.code(), &S3ErrorCode::InternalError);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_object_tagging_returns_internal_error_when_store_uninitialized() {
|
||||
let input = GetObjectTaggingInput::builder()
|
||||
|
||||
Reference in New Issue
Block a user