diff --git a/rustfs/src/app/object_usecase.rs b/rustfs/src/app/object_usecase.rs index 6a669d027..3408cb7e1 100644 --- a/rustfs/src/app/object_usecase.rs +++ b/rustfs/src/app/object_usecase.rs @@ -58,9 +58,7 @@ use rustfs_ecstore::bucket::{ metadata_sys, object_lock::{ objectlock::{get_object_legalhold_meta, get_object_retention_meta}, - objectlock_sys::{ - BucketObjectLockSys, check_object_lock_for_deletion, check_retention_for_modification, is_retention_active, - }, + objectlock_sys::{BucketObjectLockSys, check_object_lock_for_deletion, is_retention_active}, }, quota::QuotaOperation, replication::{ @@ -792,60 +790,6 @@ impl DefaultObjectUsecase { result } - pub async fn execute_put_object_legal_hold( - &self, - req: S3Request, - ) -> S3Result> { - let mut helper = - OperationHelper::new(&req, EventName::ObjectCreatedPutLegalHold, S3Operation::PutObjectLegalHold).suppress_event(); - let PutObjectLegalHoldInput { - bucket, - key, - legal_hold, - version_id, - .. - } = req.input.clone(); - - let Some(store) = new_object_layer_fn() else { - return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); - }; - - let _ = store - .get_bucket_info(&bucket, &BucketOptions::default()) - .await - .map_err(ApiError::from)?; - - validate_bucket_object_lock_enabled(&bucket).await?; - - let opts: ObjectOptions = get_opts(&bucket, &key, version_id, None, &req.headers) - .await - .map_err(ApiError::from)?; - - let eval_metadata = parse_object_lock_legal_hold(legal_hold)?; - - let popts = ObjectOptions { - mod_time: opts.mod_time, - version_id: opts.version_id, - eval_metadata: Some(eval_metadata), - ..Default::default() - }; - - let info = store.put_object_metadata(&bucket, &key, &popts).await.map_err(|e| { - error!("put_object_metadata failed, {}", e.to_string()); - s3_error!(InternalError, "{}", e.to_string()) - })?; - - let output = PutObjectLegalHoldOutput { - request_charged: Some(RequestCharged::from_static(RequestCharged::REQUESTER)), - }; - let version_id = req.input.version_id.clone().unwrap_or_default(); - helper = helper.object(info).version_id(version_id); - - let result = Ok(S3Response::new(output)); - let _ = helper.complete(&result); - result - } - #[instrument(level = "debug", skip(self))] pub async fn execute_put_object_lock_configuration( &self, @@ -916,94 +860,6 @@ impl DefaultObjectUsecase { Ok(S3Response::new(PutObjectLockConfigurationOutput::default())) } - pub async fn execute_put_object_retention( - &self, - req: S3Request, - ) -> S3Result> { - let mut helper = - OperationHelper::new(&req, EventName::ObjectCreatedPutRetention, S3Operation::PutObjectRetention).suppress_event(); - let PutObjectRetentionInput { - bucket, - key, - retention, - version_id, - .. - } = req.input.clone(); - - let Some(store) = new_object_layer_fn() else { - return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); - }; - - validate_bucket_object_lock_enabled(&bucket).await?; - - let new_retain_until = retention - .as_ref() - .and_then(|r| r.retain_until_date.as_ref()) - .map(|d| OffsetDateTime::from(d.clone())); - let new_mode = retention.as_ref().and_then(|r| r.mode.as_ref()).map(|mode| mode.as_str()); - - // TODO(security): Known TOCTOU race condition (fix in future PR). - // - // There is a time-of-check-time-of-use (TOCTOU) window between the retention - // check below (using get_object_info + check_retention_for_modification) and - // the actual update performed later in put_object_metadata. - // - // In theory: - // * Thread A reads retention mode = GOVERNANCE and checks the bypass header. - // * Thread B updates retention to COMPLIANCE mode. - // * Thread A then proceeds to modify retention, still assuming GOVERNANCE, - // and effectively bypasses what is now COMPLIANCE mode. - // - // This would violate the S3 spec, which states that COMPLIANCE-mode retention - // cannot be modified even with a bypass header. - // - // Possible fixes (to be implemented in a future change): - // 1. Pass the expected retention mode down to the storage layer and verify - // it has not changed immediately before the update. - // 2. Use optimistic concurrency (e.g., version/etag) so that the update - // fails if the object changed between check and update. - // 3. Perform the retention check inside the same lock/transaction scope as - // the metadata update within the storage layer. - // - // Current mitigation: the storage layer provides a fast_lock_manager, which - // offers some protection, but it does not fully eliminate this race. - let check_opts: ObjectOptions = get_opts(&bucket, &key, version_id.clone(), None, &req.headers) - .await - .map_err(ApiError::from)?; - - if let Ok(existing_obj_info) = store.get_object_info(&bucket, &key, &check_opts).await { - let bypass_governance = has_bypass_governance_header(&req.headers); - if let Some(block_reason) = - check_retention_for_modification(&existing_obj_info.user_defined, new_mode, new_retain_until, bypass_governance) - { - return Err(S3Error::with_message(S3ErrorCode::AccessDenied, block_reason.error_message())); - } - } - - let eval_metadata = parse_object_lock_retention(retention)?; - - let mut opts: ObjectOptions = get_opts(&bucket, &key, version_id, None, &req.headers) - .await - .map_err(ApiError::from)?; - opts.eval_metadata = Some(eval_metadata); - - let object_info = store.put_object_metadata(&bucket, &key, &opts).await.map_err(|e| { - error!("put_object_metadata failed, {}", e.to_string()); - s3_error!(InternalError, "{}", e.to_string()) - })?; - - let output = PutObjectRetentionOutput { - request_charged: Some(RequestCharged::from_static(RequestCharged::REQUESTER)), - }; - - let version_id = req.input.version_id.clone().unwrap_or_else(|| Uuid::new_v4().to_string()); - helper = helper.object(object_info).version_id(version_id); - - let result = Ok(S3Response::new(output)); - let _ = helper.complete(&result); - result - } - #[instrument( level = "debug", skip(self, req), @@ -3004,21 +2860,6 @@ mod tests { assert_eq!(err.code(), &S3ErrorCode::InternalError); } - #[tokio::test] - async fn execute_put_object_legal_hold_returns_internal_error_when_store_uninitialized() { - let input = PutObjectLegalHoldInput::builder() - .bucket("test-bucket".to_string()) - .key("test-key".to_string()) - .build() - .unwrap(); - - let req = build_request(input, Method::PUT); - let usecase = DefaultObjectUsecase::without_context(); - - let err = usecase.execute_put_object_legal_hold(req).await.unwrap_err(); - assert_eq!(err.code(), &S3ErrorCode::InternalError); - } - #[tokio::test] async fn execute_put_object_lock_configuration_returns_internal_error_when_store_uninitialized() { let input = PutObjectLockConfigurationInput::builder() @@ -3127,21 +2968,6 @@ mod tests { assert_eq!(err.code(), &S3ErrorCode::Custom("InvalidRetentionPeriod".into())); } - #[tokio::test] - async fn execute_put_object_retention_returns_internal_error_when_store_uninitialized() { - let input = PutObjectRetentionInput::builder() - .bucket("test-bucket".to_string()) - .key("test-key".to_string()) - .build() - .unwrap(); - - let req = build_request(input, Method::PUT); - let usecase = DefaultObjectUsecase::without_context(); - - let err = usecase.execute_put_object_retention(req).await.unwrap_err(); - assert_eq!(err.code(), &S3ErrorCode::InternalError); - } - #[tokio::test] async fn execute_head_object_rejects_range_with_part_number() { let input = HeadObjectInput::builder() diff --git a/rustfs/src/storage/ecfs.rs b/rustfs/src/storage/ecfs.rs index b523b5ffe..f953d25f1 100644 --- a/rustfs/src/storage/ecfs.rs +++ b/rustfs/src/storage/ecfs.rs @@ -16,15 +16,17 @@ use crate::app::bucket_usecase::DefaultBucketUsecase; use crate::app::multipart_usecase::DefaultMultipartUsecase; use crate::app::object_usecase::DefaultObjectUsecase; use crate::error::ApiError; +use crate::storage::access::has_bypass_governance_header; use crate::storage::helper::OperationHelper; use crate::storage::options::get_opts; use crate::storage::s3_api::acl; -use crate::storage::validate_bucket_object_lock_enabled; +use crate::storage::{parse_object_lock_legal_hold, parse_object_lock_retention, validate_bucket_object_lock_enabled}; use metrics::{counter, histogram}; use rustfs_ecstore::{ bucket::{ metadata::{BUCKET_ACCELERATE_CONFIG, BUCKET_LOGGING_CONFIG, BUCKET_REQUEST_PAYMENT_CONFIG, BUCKET_WEBSITE_CONFIG}, metadata_sys, + object_lock::objectlock_sys::check_retention_for_modification, tagging::{decode_tags, decode_tags_to_map, encode_tags}, utils::serialize, }, @@ -969,8 +971,54 @@ impl S3 for FS { &self, req: S3Request, ) -> S3Result> { - let usecase = DefaultObjectUsecase::from_global(); - usecase.execute_put_object_legal_hold(req).await + let mut helper = + OperationHelper::new(&req, EventName::ObjectCreatedPutLegalHold, S3Operation::PutObjectLegalHold).suppress_event(); + let PutObjectLegalHoldInput { + bucket, + key, + legal_hold, + version_id, + .. + } = req.input.clone(); + + let Some(store) = new_object_layer_fn() else { + return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); + }; + + let _ = store + .get_bucket_info(&bucket, &BucketOptions::default()) + .await + .map_err(ApiError::from)?; + + validate_bucket_object_lock_enabled(&bucket).await?; + + let opts: ObjectOptions = get_opts(&bucket, &key, version_id, None, &req.headers) + .await + .map_err(ApiError::from)?; + + let eval_metadata = parse_object_lock_legal_hold(legal_hold)?; + + let popts = ObjectOptions { + mod_time: opts.mod_time, + version_id: opts.version_id, + eval_metadata: Some(eval_metadata), + ..Default::default() + }; + + let info = store.put_object_metadata(&bucket, &key, &popts).await.map_err(|e| { + error!("put_object_metadata failed, {}", e.to_string()); + s3_error!(InternalError, "{}", e.to_string()) + })?; + + let output = PutObjectLegalHoldOutput { + request_charged: Some(RequestCharged::from_static(RequestCharged::REQUESTER)), + }; + let version_id = req.input.version_id.clone().unwrap_or_default(); + helper = helper.object(info).version_id(version_id); + + let result = Ok(S3Response::new(output)); + let _ = helper.complete(&result); + result } #[instrument(level = "debug", skip(self))] @@ -986,8 +1034,88 @@ impl S3 for FS { &self, req: S3Request, ) -> S3Result> { - let usecase = DefaultObjectUsecase::from_global(); - usecase.execute_put_object_retention(req).await + let mut helper = + OperationHelper::new(&req, EventName::ObjectCreatedPutRetention, S3Operation::PutObjectRetention).suppress_event(); + let PutObjectRetentionInput { + bucket, + key, + retention, + version_id, + .. + } = req.input.clone(); + + let Some(store) = new_object_layer_fn() else { + return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string())); + }; + + validate_bucket_object_lock_enabled(&bucket).await?; + + let new_retain_until = retention + .as_ref() + .and_then(|r| r.retain_until_date.as_ref()) + .map(|d| OffsetDateTime::from(d.clone())); + let new_mode = retention.as_ref().and_then(|r| r.mode.as_ref()).map(|mode| mode.as_str()); + + // TODO(security): Known TOCTOU race condition (fix in future PR). + // + // There is a time-of-check-time-of-use (TOCTOU) window between the retention + // check below (using get_object_info + check_retention_for_modification) and + // the actual update performed later in put_object_metadata. + // + // In theory: + // * Thread A reads retention mode = GOVERNANCE and checks the bypass header. + // * Thread B updates retention to COMPLIANCE mode. + // * Thread A then proceeds to modify retention, still assuming GOVERNANCE, + // and effectively bypasses what is now COMPLIANCE mode. + // + // This would violate the S3 spec, which states that COMPLIANCE-mode retention + // cannot be modified even with a bypass header. + // + // Possible fixes (to be implemented in a future change): + // 1. Pass the expected retention mode down to the storage layer and verify + // it has not changed immediately before the update. + // 2. Use optimistic concurrency (e.g., version/etag) so that the update + // fails if the object changed between check and update. + // 3. Perform the retention check inside the same lock/transaction scope as + // the metadata update within the storage layer. + // + // Current mitigation: the storage layer provides a fast_lock_manager, which + // offers some protection, but it does not fully eliminate this race. + let check_opts: ObjectOptions = get_opts(&bucket, &key, version_id.clone(), None, &req.headers) + .await + .map_err(ApiError::from)?; + + if let Ok(existing_obj_info) = store.get_object_info(&bucket, &key, &check_opts).await { + let bypass_governance = has_bypass_governance_header(&req.headers); + if let Some(block_reason) = + check_retention_for_modification(&existing_obj_info.user_defined, new_mode, new_retain_until, bypass_governance) + { + return Err(S3Error::with_message(S3ErrorCode::AccessDenied, block_reason.error_message())); + } + } + + let eval_metadata = parse_object_lock_retention(retention)?; + + let mut opts: ObjectOptions = get_opts(&bucket, &key, version_id, None, &req.headers) + .await + .map_err(ApiError::from)?; + opts.eval_metadata = Some(eval_metadata); + + let object_info = store.put_object_metadata(&bucket, &key, &opts).await.map_err(|e| { + error!("put_object_metadata failed, {}", e.to_string()); + s3_error!(InternalError, "{}", e.to_string()) + })?; + + let output = PutObjectRetentionOutput { + request_charged: Some(RequestCharged::from_static(RequestCharged::REQUESTER)), + }; + + let version_id = req.input.version_id.clone().unwrap_or_else(|| Uuid::new_v4().to_string()); + helper = helper.object(object_info).version_id(version_id); + + let result = Ok(S3Response::new(output)); + let _ = helper.complete(&result); + result } #[instrument(level = "debug", skip(self, req))] diff --git a/rustfs/src/storage/ecfs_test.rs b/rustfs/src/storage/ecfs_test.rs index 77c127bfd..71572e357 100644 --- a/rustfs/src/storage/ecfs_test.rs +++ b/rustfs/src/storage/ecfs_test.rs @@ -37,8 +37,8 @@ mod tests { use s3s::dto::{ CORSConfiguration, CORSRule, DeleteObjectTaggingInput, Delimiter, GetObjectAclInput, GetObjectLegalHoldInput, GetObjectRetentionInput, GetObjectTaggingInput, LambdaFunctionConfiguration, ObjectLockLegalHold, - ObjectLockLegalHoldStatus, ObjectLockRetention, ObjectLockRetentionMode, PutObjectTaggingInput, QueueConfiguration, Tag, - Tagging, TopicConfiguration, + ObjectLockLegalHoldStatus, ObjectLockRetention, ObjectLockRetentionMode, PutObjectLegalHoldInput, + PutObjectRetentionInput, PutObjectTaggingInput, QueueConfiguration, Tag, Tagging, TopicConfiguration, }; use s3s::{S3, S3Error, S3ErrorCode, S3Request, s3_error}; use time::OffsetDateTime; @@ -225,6 +225,32 @@ mod tests { assert_eq!(err.code(), &S3ErrorCode::InternalError); } + #[tokio::test] + async fn test_put_object_legal_hold_returns_internal_error_when_store_uninitialized() { + let input = PutObjectLegalHoldInput::builder() + .bucket("test-bucket".to_string()) + .key("test-key".to_string()) + .build() + .unwrap(); + + let fs = FS::new(); + let err = fs.put_object_legal_hold(build_request(input, Method::PUT)).await.unwrap_err(); + assert_eq!(err.code(), &S3ErrorCode::InternalError); + } + + #[tokio::test] + async fn test_put_object_retention_returns_internal_error_when_store_uninitialized() { + let input = PutObjectRetentionInput::builder() + .bucket("test-bucket".to_string()) + .key("test-key".to_string()) + .build() + .unwrap(); + + let fs = FS::new(); + let err = fs.put_object_retention(build_request(input, Method::PUT)).await.unwrap_err(); + assert_eq!(err.code(), &S3ErrorCode::InternalError); + } + #[tokio::test] async fn test_get_object_tagging_returns_internal_error_when_store_uninitialized() { let input = GetObjectTaggingInput::builder()