mirror of
https://github.com/rustfs/rustfs.git
synced 2026-06-03 15:39:36 +08:00
fix(s3): normalize GetObjectAttributes ETag XML response (#2002)
This commit is contained in:
@@ -45,6 +45,7 @@ const _ERR_XML_NOT_WELL_FORMED: &str =
|
||||
const ERR_LIFECYCLE_BUCKET_LOCKED: &str =
|
||||
"ExpiredObjectAllVersions element and DelMarkerExpiration action cannot be used on an retention bucket";
|
||||
const ERR_LIFECYCLE_TOO_MANY_RULES: &str = "Lifecycle configuration should have at most 1000 rules";
|
||||
const ERR_LIFECYCLE_INVALID_EXPIRATION_DAYS: &str = "Lifecycle expiration days must be greater than 0";
|
||||
|
||||
pub use rustfs_common::metrics::IlmAction;
|
||||
|
||||
@@ -232,6 +233,13 @@ impl Lifecycle for BucketLifecycleConfiguration {
|
||||
}
|
||||
|
||||
for r in &self.rules {
|
||||
if let Some(expiration) = &r.expiration {
|
||||
if let Some(days) = expiration.days {
|
||||
if days <= 0 {
|
||||
return Err(std::io::Error::other(ERR_LIFECYCLE_INVALID_EXPIRATION_DAYS));
|
||||
}
|
||||
}
|
||||
}
|
||||
r.validate()?;
|
||||
/*if let Some(object_lock_enabled) = lr.object_lock_enabled.as_ref() {
|
||||
if let Some(expiration) = r.expiration.as_ref() {
|
||||
@@ -770,3 +778,59 @@ impl Default for TransitionOptions {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn validate_rejects_non_positive_expiration_days() {
|
||||
let lc = BucketLifecycleConfiguration {
|
||||
rules: vec![LifecycleRule {
|
||||
status: ExpirationStatus::from_static(ExpirationStatus::ENABLED),
|
||||
expiration: Some(LifecycleExpiration {
|
||||
days: Some(0),
|
||||
..Default::default()
|
||||
}),
|
||||
abort_incomplete_multipart_upload: None,
|
||||
filter: None,
|
||||
id: None,
|
||||
noncurrent_version_expiration: None,
|
||||
noncurrent_version_transitions: None,
|
||||
prefix: None,
|
||||
transitions: None,
|
||||
}],
|
||||
};
|
||||
|
||||
let err = lc
|
||||
.validate(&ObjectLockConfiguration::default())
|
||||
.await
|
||||
.expect_err("expected validation error");
|
||||
|
||||
assert_eq!(err.to_string(), ERR_LIFECYCLE_INVALID_EXPIRATION_DAYS);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn validate_accepts_positive_expiration_days() {
|
||||
let lc = BucketLifecycleConfiguration {
|
||||
rules: vec![LifecycleRule {
|
||||
status: ExpirationStatus::from_static(ExpirationStatus::ENABLED),
|
||||
expiration: Some(LifecycleExpiration {
|
||||
days: Some(30),
|
||||
..Default::default()
|
||||
}),
|
||||
abort_incomplete_multipart_upload: None,
|
||||
filter: None,
|
||||
id: None,
|
||||
noncurrent_version_expiration: None,
|
||||
noncurrent_version_transitions: None,
|
||||
prefix: None,
|
||||
transitions: None,
|
||||
}],
|
||||
};
|
||||
|
||||
lc.validate(&ObjectLockConfiguration::default())
|
||||
.await
|
||||
.expect("expected validation to pass");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -387,8 +387,12 @@ impl ObjectInfo {
|
||||
}
|
||||
|
||||
// Check if object is encrypted
|
||||
// Encrypted objects store original size in x-rustfs-encryption-original-size metadata
|
||||
if let Some(size_str) = self.user_defined.get("x-rustfs-encryption-original-size")
|
||||
// Managed SSE stores original size in x-rustfs-encryption-original-size metadata
|
||||
// SSE-C stores original size in x-amz-server-side-encryption-customer-original-size
|
||||
if let Some(size_str) = self
|
||||
.user_defined
|
||||
.get("x-rustfs-encryption-original-size")
|
||||
.or_else(|| self.user_defined.get("x-amz-server-side-encryption-customer-original-size"))
|
||||
&& !size_str.is_empty()
|
||||
{
|
||||
let size = size_str
|
||||
@@ -1010,3 +1014,102 @@ pub struct ObjectInfoOrErr {
|
||||
pub item: Option<ObjectInfo>,
|
||||
pub err: Option<Error>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn get_actual_size_prefers_actual_size_field() {
|
||||
let info = ObjectInfo {
|
||||
size: 5,
|
||||
actual_size: 10,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
assert_eq!(info.get_actual_size().unwrap(), 10);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn get_actual_size_uses_compressed_metadata_size() {
|
||||
let user_defined = {
|
||||
let mut map = HashMap::new();
|
||||
map.insert(format!("{RESERVED_METADATA_PREFIX_LOWER}compression"), "zstd".to_string());
|
||||
map.insert(format!("{RESERVED_METADATA_PREFIX_LOWER}actual-size"), "42".to_string());
|
||||
map
|
||||
};
|
||||
|
||||
let info = ObjectInfo {
|
||||
size: 100,
|
||||
actual_size: 0,
|
||||
user_defined,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
assert_eq!(info.get_actual_size().unwrap(), 42);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn get_actual_size_falls_back_to_encrypted_original_size_metadata() {
|
||||
let user_defined = {
|
||||
let mut map = HashMap::new();
|
||||
map.insert("x-amz-server-side-encryption-customer-original-size".to_string(), "77".to_string());
|
||||
map
|
||||
};
|
||||
|
||||
let info = ObjectInfo {
|
||||
size: 100,
|
||||
actual_size: 0,
|
||||
user_defined,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
assert_eq!(info.get_actual_size().unwrap(), 77);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn get_actual_size_uses_compressed_parts_actual_size_when_metadata_missing() {
|
||||
let user_defined = {
|
||||
let mut map = HashMap::new();
|
||||
map.insert(format!("{RESERVED_METADATA_PREFIX_LOWER}compression"), "zstd".to_string());
|
||||
map
|
||||
};
|
||||
|
||||
let info = ObjectInfo {
|
||||
size: 12,
|
||||
actual_size: 0,
|
||||
user_defined,
|
||||
parts: vec![
|
||||
rustfs_filemeta::ObjectPartInfo {
|
||||
actual_size: 4,
|
||||
..Default::default()
|
||||
},
|
||||
rustfs_filemeta::ObjectPartInfo {
|
||||
actual_size: 5,
|
||||
..Default::default()
|
||||
},
|
||||
],
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
assert_eq!(info.get_actual_size().unwrap(), 9);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn get_actual_size_returns_error_when_compressed_parts_missing_and_size_mismatch() {
|
||||
let user_defined = {
|
||||
let mut map = HashMap::new();
|
||||
map.insert(format!("{RESERVED_METADATA_PREFIX_LOWER}compression"), "zstd".to_string());
|
||||
map
|
||||
};
|
||||
|
||||
let info = ObjectInfo {
|
||||
size: 12,
|
||||
actual_size: 0,
|
||||
user_defined,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
assert!(info.get_actual_size().is_err());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -465,7 +465,7 @@ impl DefaultBucketUsecase {
|
||||
.await
|
||||
.map_err(ApiError::from)?;
|
||||
|
||||
Ok(S3Response::new(DeleteBucketEncryptionOutput::default()))
|
||||
Ok(S3Response::with_status(DeleteBucketEncryptionOutput::default(), StatusCode::NO_CONTENT))
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
@@ -643,8 +643,11 @@ impl DefaultBucketUsecase {
|
||||
let server_side_encryption_configuration = match metadata_sys::get_sse_config(&bucket).await {
|
||||
Ok((cfg, _)) => Some(cfg),
|
||||
Err(err) => {
|
||||
if err == StorageError::ConfigNotFound {
|
||||
return Err(s3_error!(ServerSideEncryptionConfigurationNotFoundError));
|
||||
}
|
||||
warn!("get_sse_config err {:?}", err);
|
||||
None
|
||||
return Err(ApiError::from(err).into());
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1079,15 +1082,21 @@ impl DefaultBucketUsecase {
|
||||
|
||||
let Some(input_cfg) = lifecycle_configuration else { return Err(s3_error!(InvalidArgument)) };
|
||||
|
||||
let rcfg = metadata_sys::get_object_lock_config(&bucket).await;
|
||||
if let Ok(rcfg) = rcfg
|
||||
&& let Err(err) = rustfs_ecstore::bucket::lifecycle::lifecycle::Lifecycle::validate(&input_cfg, &rcfg.0).await
|
||||
{
|
||||
return Err(S3Error::with_message(S3ErrorCode::Custom("ValidateFailed".into()), err.to_string()));
|
||||
let rcfg = match metadata_sys::get_object_lock_config(&bucket).await {
|
||||
Ok((cfg, _)) => cfg,
|
||||
Err(StorageError::ConfigNotFound) => ObjectLockConfiguration::default(),
|
||||
Err(err) => {
|
||||
warn!("get_object_lock_config err {:?}", err);
|
||||
return Err(ApiError::from(err).into());
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(err) = rustfs_ecstore::bucket::lifecycle::lifecycle::Lifecycle::validate(&input_cfg, &rcfg).await {
|
||||
return Err(s3_error!(InvalidArgument, "{err}"));
|
||||
}
|
||||
|
||||
if let Err(err) = validate_transition_tier(&input_cfg).await {
|
||||
return Err(S3Error::with_message(S3ErrorCode::Custom("CustomError".into()), err.to_string()));
|
||||
return Err(s3_error!(InvalidArgument, "{err}"));
|
||||
}
|
||||
|
||||
let data = serialize_config(&input_cfg)?;
|
||||
|
||||
@@ -28,6 +28,7 @@ use crate::storage::options::{
|
||||
copy_dst_opts, copy_src_opts, del_opts, extract_metadata, extract_metadata_from_mime_with_object_name,
|
||||
filter_object_metadata, get_content_sha256, get_opts, put_opts,
|
||||
};
|
||||
use crate::storage::s3_api::multipart::parse_list_parts_params;
|
||||
use crate::storage::s3_api::{restore, select};
|
||||
use crate::storage::*;
|
||||
use bytes::Bytes;
|
||||
@@ -84,7 +85,8 @@ use rustfs_utils::http::{
|
||||
headers::{
|
||||
AMZ_DECODED_CONTENT_LENGTH, AMZ_OBJECT_LOCK_LEGAL_HOLD, AMZ_OBJECT_LOCK_LEGAL_HOLD_LOWER, AMZ_OBJECT_LOCK_MODE,
|
||||
AMZ_OBJECT_LOCK_MODE_LOWER, AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE, AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE_LOWER,
|
||||
AMZ_OBJECT_TAGGING, AMZ_RESTORE_EXPIRY_DAYS, AMZ_RESTORE_REQUEST_DATE, AMZ_TAG_COUNT, RESERVED_METADATA_PREFIX_LOWER,
|
||||
AMZ_OBJECT_TAGGING, AMZ_RESTORE_EXPIRY_DAYS, AMZ_RESTORE_REQUEST_DATE, AMZ_STORAGE_CLASS, AMZ_TAG_COUNT,
|
||||
RESERVED_METADATA_PREFIX_LOWER,
|
||||
},
|
||||
};
|
||||
use rustfs_utils::path::{is_dir_object, path_join_buf};
|
||||
@@ -1575,33 +1577,223 @@ impl DefaultObjectUsecase {
|
||||
}
|
||||
|
||||
let mut helper = OperationHelper::new(&req, EventName::ObjectAccessedAttributes, "s3:GetObjectAttributes");
|
||||
let GetObjectAttributesInput { bucket, key, .. } = req.input.clone();
|
||||
let GetObjectAttributesInput {
|
||||
bucket,
|
||||
key,
|
||||
max_parts,
|
||||
object_attributes,
|
||||
part_number_marker,
|
||||
version_id,
|
||||
sse_customer_key,
|
||||
sse_customer_key_md5,
|
||||
..
|
||||
} = req.input;
|
||||
|
||||
let Some(store) = new_object_layer_fn() else {
|
||||
return Err(S3Error::with_message(S3ErrorCode::InternalError, "Not init".to_string()));
|
||||
};
|
||||
|
||||
if let Err(e) = store
|
||||
.get_object_reader(&bucket, &key, None, HeaderMap::new(), &ObjectOptions::default())
|
||||
let opts: ObjectOptions = get_opts(&bucket, &key, version_id.clone(), None, &req.headers)
|
||||
.await
|
||||
{
|
||||
return Err(S3Error::with_message(S3ErrorCode::InternalError, format!("{e}")));
|
||||
.map_err(ApiError::from)?;
|
||||
|
||||
let info = match store.get_object_info(&bucket, &key, &opts).await {
|
||||
Ok(info) => info,
|
||||
Err(err) => {
|
||||
if is_err_object_not_found(&err) || is_err_version_not_found(&err) {
|
||||
if is_dir_object(&key) {
|
||||
let has_children = match probe_prefix_has_children(store, &bucket, &key, false).await {
|
||||
Ok(has_children) => has_children,
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Failed to probe children for object attributes (bucket: {}, key: {}): {}",
|
||||
bucket, key, e
|
||||
);
|
||||
false
|
||||
}
|
||||
};
|
||||
let msg = head_prefix_not_found_message(&bucket, &key, has_children);
|
||||
return Err(S3Error::with_message(S3ErrorCode::NoSuchKey, msg));
|
||||
}
|
||||
return Err(S3Error::new(S3ErrorCode::NoSuchKey));
|
||||
}
|
||||
return Err(ApiError::from(err).into());
|
||||
}
|
||||
};
|
||||
|
||||
if info.delete_marker {
|
||||
if opts.version_id.is_none() {
|
||||
return Err(S3Error::new(S3ErrorCode::NoSuchKey));
|
||||
}
|
||||
return Err(S3Error::new(S3ErrorCode::MethodNotAllowed));
|
||||
}
|
||||
|
||||
validate_ssec_for_read(&info.user_defined, sse_customer_key.as_ref(), sse_customer_key_md5.as_ref())?;
|
||||
|
||||
let metadata_map = info.user_defined.clone();
|
||||
let storage_class = info
|
||||
.storage_class
|
||||
.clone()
|
||||
.or_else(|| metadata_map.get(AMZ_STORAGE_CLASS).cloned())
|
||||
.filter(|s| !s.is_empty())
|
||||
.map(StorageClass::from);
|
||||
|
||||
debug!(
|
||||
"GetObjectAttributes raw object_attributes={:?}",
|
||||
object_attributes.iter().map(|value| value.as_str()).collect::<Vec<_>>()
|
||||
);
|
||||
|
||||
let requested = |name: &'static str| -> bool { object_attributes_requested(&object_attributes, name) };
|
||||
|
||||
let e_tag = if requested(ObjectAttributes::ETAG) {
|
||||
info.etag.as_ref().map(|etag| to_s3s_etag(etag))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let object_size = if requested(ObjectAttributes::OBJECT_SIZE) {
|
||||
Some(info.get_actual_size().map_err(ApiError::from)?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let checksum = if requested(ObjectAttributes::CHECKSUM) {
|
||||
let (checksums, _is_multipart) = info.decrypt_checksums(0, &req.headers).map_err(ApiError::from)?;
|
||||
let mut checksum_crc32 = None;
|
||||
let mut checksum_crc32c = None;
|
||||
let mut checksum_sha1 = None;
|
||||
let mut checksum_sha256 = None;
|
||||
let mut checksum_crc64nvme = None;
|
||||
let mut checksum_type = None;
|
||||
|
||||
for (k, v) in checksums {
|
||||
if k == AMZ_CHECKSUM_TYPE {
|
||||
checksum_type = Some(ChecksumType::from(v));
|
||||
continue;
|
||||
}
|
||||
match rustfs_rio::ChecksumType::from_string(k.as_str()) {
|
||||
rustfs_rio::ChecksumType::CRC32 => checksum_crc32 = Some(v),
|
||||
rustfs_rio::ChecksumType::CRC32C => checksum_crc32c = Some(v),
|
||||
rustfs_rio::ChecksumType::SHA1 => checksum_sha1 = Some(v),
|
||||
rustfs_rio::ChecksumType::SHA256 => checksum_sha256 = Some(v),
|
||||
rustfs_rio::ChecksumType::CRC64_NVME => checksum_crc64nvme = Some(v),
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
|
||||
Some(Checksum {
|
||||
checksum_crc32,
|
||||
checksum_crc32c,
|
||||
checksum_sha1,
|
||||
checksum_sha256,
|
||||
checksum_crc64nvme,
|
||||
checksum_type,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let object_parts = if requested(ObjectAttributes::OBJECT_PARTS) && info.is_multipart() {
|
||||
let params = parse_list_parts_params(part_number_marker, max_parts)?;
|
||||
let mut parts = Vec::new();
|
||||
let mut marker = params.part_number_marker;
|
||||
let max_parts = params.max_parts;
|
||||
let mut start_at = 0usize;
|
||||
|
||||
if let Some(marker_value) = marker {
|
||||
if let Some(index) = info.parts.iter().position(|part| part.number == marker_value) {
|
||||
start_at = index + 1;
|
||||
} else {
|
||||
marker = None;
|
||||
}
|
||||
}
|
||||
|
||||
let max_parts: i32 = max_parts.try_into().map_err(|_| {
|
||||
S3Error::with_message(S3ErrorCode::InvalidArgument, "max-parts value is out of range".to_string())
|
||||
})?;
|
||||
let end = (start_at + params.max_parts).min(info.parts.len());
|
||||
let is_truncated = end < info.parts.len();
|
||||
|
||||
for part in &info.parts[start_at..end] {
|
||||
let (checksums, _is_multipart) = info.decrypt_checksums(part.number, &req.headers).map_err(ApiError::from)?;
|
||||
let mut checksum_crc32 = None;
|
||||
let mut checksum_crc32c = None;
|
||||
let mut checksum_sha1 = None;
|
||||
let mut checksum_sha256 = None;
|
||||
let mut checksum_crc64nvme = None;
|
||||
|
||||
for (k, v) in checksums {
|
||||
match rustfs_rio::ChecksumType::from_string(k.as_str()) {
|
||||
rustfs_rio::ChecksumType::CRC32 => checksum_crc32 = Some(v),
|
||||
rustfs_rio::ChecksumType::CRC32C => checksum_crc32c = Some(v),
|
||||
rustfs_rio::ChecksumType::SHA1 => checksum_sha1 = Some(v),
|
||||
rustfs_rio::ChecksumType::SHA256 => checksum_sha256 = Some(v),
|
||||
rustfs_rio::ChecksumType::CRC64_NVME => checksum_crc64nvme = Some(v),
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
|
||||
let part_size = if part.actual_size > 0 {
|
||||
part.actual_size
|
||||
} else {
|
||||
part.size.try_into().map_err(|_| {
|
||||
S3Error::with_message(S3ErrorCode::InvalidArgument, "Part size value is out of range".to_string())
|
||||
})?
|
||||
};
|
||||
|
||||
parts.push(ObjectPart {
|
||||
checksum_crc32,
|
||||
checksum_crc32c,
|
||||
checksum_sha1,
|
||||
checksum_sha256,
|
||||
checksum_crc64nvme,
|
||||
part_number: i32::try_from(part.number).ok(),
|
||||
size: Some(part_size),
|
||||
});
|
||||
}
|
||||
|
||||
let part_number_marker = marker.and_then(|v| i32::try_from(v).ok());
|
||||
let next_part_number_marker = parts.last().and_then(|part| part.part_number);
|
||||
|
||||
Some(GetObjectAttributesParts {
|
||||
is_truncated: Some(is_truncated),
|
||||
max_parts: Some(max_parts),
|
||||
next_part_number_marker,
|
||||
part_number_marker,
|
||||
parts: Some(parts),
|
||||
total_parts_count: Some(i32::try_from(info.parts.len()).map_err(|_| {
|
||||
S3Error::with_message(S3ErrorCode::InvalidArgument, "Part count is out of range".to_string())
|
||||
})?),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let version_id = if BucketVersioningSys::prefix_enabled(&bucket, &key).await {
|
||||
info.version_id.map(|vid| {
|
||||
if vid == Uuid::nil() {
|
||||
"null".to_string()
|
||||
} else {
|
||||
vid.to_string()
|
||||
}
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let output = GetObjectAttributesOutput {
|
||||
delete_marker: None,
|
||||
object_parts: None,
|
||||
checksum,
|
||||
delete_marker: if info.delete_marker { Some(true) } else { None },
|
||||
e_tag,
|
||||
last_modified: info.mod_time.map(Timestamp::from),
|
||||
object_parts,
|
||||
object_size,
|
||||
storage_class,
|
||||
version_id: version_id.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let version_id = req.input.version_id.clone().unwrap_or_default();
|
||||
helper = helper
|
||||
.object(ObjectInfo {
|
||||
name: key.clone(),
|
||||
bucket,
|
||||
..Default::default()
|
||||
})
|
||||
.version_id(version_id);
|
||||
helper = helper.object(info).version_id(version_id.unwrap_or_default());
|
||||
|
||||
let result = Ok(S3Response::new(output));
|
||||
let _ = helper.complete(&result);
|
||||
@@ -3423,6 +3615,15 @@ impl DefaultObjectUsecase {
|
||||
}
|
||||
}
|
||||
|
||||
fn object_attributes_requested(object_attributes: &[ObjectAttributes], name: &'static str) -> bool {
|
||||
object_attributes.iter().any(|value| {
|
||||
value.as_str().split(',').any(|part| {
|
||||
part.trim_matches(|c: char| c.is_whitespace() || c == '"' || c == '\'')
|
||||
.eq_ignore_ascii_case(name)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -3595,6 +3796,42 @@ mod tests {
|
||||
assert_eq!(err.code(), &S3ErrorCode::InternalError);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn object_attributes_requested_with_single_value() {
|
||||
let object_attributes = vec![ObjectAttributes::from_static(ObjectAttributes::ETAG)];
|
||||
|
||||
assert!(object_attributes_requested(&object_attributes, ObjectAttributes::ETAG));
|
||||
assert!(!object_attributes_requested(&object_attributes, ObjectAttributes::OBJECT_SIZE));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn object_attributes_requested_with_comma_separated_values() {
|
||||
let object_attributes = vec![
|
||||
ObjectAttributes::from_static("ObjectParts,etag"),
|
||||
ObjectAttributes::from_static("StorageClass"),
|
||||
];
|
||||
|
||||
assert!(object_attributes_requested(&object_attributes, ObjectAttributes::OBJECT_PARTS));
|
||||
assert!(object_attributes_requested(&object_attributes, ObjectAttributes::ETAG));
|
||||
assert!(!object_attributes_requested(&object_attributes, ObjectAttributes::OBJECT_SIZE));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn object_attributes_requested_with_quotes_and_spaces() {
|
||||
let object_attributes = vec![ObjectAttributes::from_static("'ObjectSize', \"Checksum\" , \"Etag\"")];
|
||||
|
||||
assert!(object_attributes_requested(&object_attributes, ObjectAttributes::OBJECT_SIZE));
|
||||
assert!(object_attributes_requested(&object_attributes, ObjectAttributes::CHECKSUM));
|
||||
assert!(object_attributes_requested(&object_attributes, ObjectAttributes::ETAG));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn object_attributes_requested_returns_false_for_missing_name() {
|
||||
let object_attributes = vec![ObjectAttributes::from_static("Checksum")];
|
||||
|
||||
assert!(!object_attributes_requested(&object_attributes, ObjectAttributes::OBJECT_SIZE));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn execute_get_object_legal_hold_returns_internal_error_when_store_uninitialized() {
|
||||
let input = GetObjectLegalHoldInput::builder()
|
||||
|
||||
@@ -21,7 +21,7 @@ use crate::server::{
|
||||
ReadinessGateLayer, RemoteAddr, ServiceState, ServiceStateManager,
|
||||
compress::{CompressionConfig, CompressionPredicate},
|
||||
hybrid::hybrid,
|
||||
layer::{ConditionalCorsLayer, RedirectLayer},
|
||||
layer::{ConditionalCorsLayer, ObjectAttributesEtagFixLayer, RedirectLayer},
|
||||
};
|
||||
use crate::storage;
|
||||
use crate::storage::tonic_service::make_server;
|
||||
@@ -693,6 +693,7 @@ fn process_connection(
|
||||
// Compress responses based on whitelist configuration
|
||||
// Only compresses when enabled and matches configured extensions/MIME types
|
||||
.layer(CompressionLayer::new().compress_when(CompressionPredicate::new(compression_config)))
|
||||
.layer(ObjectAttributesEtagFixLayer)
|
||||
// Conditional CORS layer: only applies to S3 API requests (not Admin, not Console)
|
||||
// Admin has its own CORS handling in router.rs
|
||||
// Console has its own CORS layer in setup_console_middleware_stack()
|
||||
|
||||
@@ -15,9 +15,12 @@
|
||||
use crate::admin::console::is_console_path;
|
||||
use crate::server::cors;
|
||||
use crate::server::hybrid::HybridBody;
|
||||
use crate::server::{ADMIN_PREFIX, RPC_PREFIX};
|
||||
use crate::server::{ADMIN_PREFIX, CONSOLE_PREFIX, RPC_PREFIX, RUSTFS_ADMIN_PREFIX};
|
||||
use crate::storage::apply_cors_headers;
|
||||
use bytes::Bytes;
|
||||
use http::{HeaderMap, HeaderValue, Method, Request as HttpRequest, Response, StatusCode};
|
||||
use http_body::Body;
|
||||
use http_body_util::BodyExt;
|
||||
use hyper::body::Incoming;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
@@ -95,6 +98,152 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ObjectAttributesEtagFixLayer;
|
||||
|
||||
impl<S> Layer<S> for ObjectAttributesEtagFixLayer {
|
||||
type Service = ObjectAttributesEtagFixService<S>;
|
||||
|
||||
fn layer(&self, inner: S) -> Self::Service {
|
||||
ObjectAttributesEtagFixService { inner }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ObjectAttributesEtagFixService<S> {
|
||||
inner: S,
|
||||
}
|
||||
|
||||
impl<S, RestBody, GrpcBody> Service<HttpRequest<Incoming>> for ObjectAttributesEtagFixService<S>
|
||||
where
|
||||
S: Service<HttpRequest<Incoming>, Response = Response<HybridBody<RestBody, GrpcBody>>> + Clone + Send + 'static,
|
||||
S::Future: Send + 'static,
|
||||
S::Error: Send + 'static,
|
||||
RestBody: Body<Data = Bytes> + From<Bytes> + Send + 'static,
|
||||
RestBody::Error: Into<S::Error> + Send + 'static,
|
||||
GrpcBody: Send + 'static,
|
||||
{
|
||||
type Response = Response<HybridBody<RestBody, GrpcBody>>;
|
||||
type Error = S::Error;
|
||||
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
|
||||
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.inner.poll_ready(cx)
|
||||
}
|
||||
|
||||
fn call(&mut self, req: HttpRequest<Incoming>) -> Self::Future {
|
||||
let is_target = is_object_attributes_request(&req);
|
||||
let mut inner = self.inner.clone();
|
||||
|
||||
Box::pin(async move {
|
||||
let response = inner.call(req).await?;
|
||||
let (parts, body) = response.into_parts();
|
||||
let should_fix = is_target && parts.status.is_success() && is_xml_response(&parts.headers);
|
||||
|
||||
let response = match body {
|
||||
HybridBody::Rest { rest_body } => {
|
||||
if !should_fix {
|
||||
Response::from_parts(parts, HybridBody::Rest { rest_body })
|
||||
} else {
|
||||
let rest_body = fix_object_attributes_etag_in_xml(rest_body).await.map_err(Into::into)?;
|
||||
|
||||
let mut parts = parts;
|
||||
parts.headers.remove(http::header::CONTENT_LENGTH);
|
||||
|
||||
Response::from_parts(parts, HybridBody::Rest { rest_body })
|
||||
}
|
||||
}
|
||||
HybridBody::Grpc { grpc_body } => Response::from_parts(parts, HybridBody::Grpc { grpc_body }),
|
||||
};
|
||||
|
||||
Ok(response)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn is_xml_response(headers: &HeaderMap) -> bool {
|
||||
let is_xml = headers
|
||||
.get(http::header::CONTENT_TYPE)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.map(|content_type| content_type.to_ascii_lowercase().contains("xml"))
|
||||
.unwrap_or(false);
|
||||
if !is_xml {
|
||||
return false;
|
||||
}
|
||||
|
||||
match headers
|
||||
.get(http::header::CONTENT_ENCODING)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
{
|
||||
Some(encoding) => encoding.trim().is_empty() || encoding.eq_ignore_ascii_case("identity"),
|
||||
None => true,
|
||||
}
|
||||
}
|
||||
|
||||
async fn fix_object_attributes_etag_in_xml<RestBody>(body: RestBody) -> Result<RestBody, RestBody::Error>
|
||||
where
|
||||
RestBody: Body<Data = Bytes> + From<Bytes>,
|
||||
{
|
||||
let bytes = BodyExt::collect(body).await?.to_bytes();
|
||||
let xml = String::from_utf8(bytes.to_vec()).unwrap_or_else(|_| String::from_utf8_lossy(&bytes).into_owned());
|
||||
let fixed = strip_quotes_from_first_etag(xml);
|
||||
Ok(RestBody::from(Bytes::from(fixed)))
|
||||
}
|
||||
|
||||
fn strip_quotes_from_first_etag(xml: String) -> String {
|
||||
let Some(start) = xml.find("<ETag>") else {
|
||||
return xml;
|
||||
};
|
||||
let value_start = start + "<ETag>".len();
|
||||
let value_rest = &xml[value_start..];
|
||||
let Some(end_offset) = value_rest.find("</ETag>") else {
|
||||
return xml;
|
||||
};
|
||||
let value_end = value_start + end_offset;
|
||||
let raw = &xml[value_start..value_end];
|
||||
|
||||
let Some(trimmed) = raw.strip_prefix('"').and_then(|v| v.strip_suffix('"')) else {
|
||||
return xml;
|
||||
};
|
||||
|
||||
let mut fixed = String::with_capacity(xml.len() - 2);
|
||||
fixed.push_str(&xml[..value_start]);
|
||||
fixed.push_str(trimmed);
|
||||
fixed.push_str(&xml[value_end..]);
|
||||
fixed
|
||||
}
|
||||
|
||||
fn is_object_attributes_request(req: &HttpRequest<Incoming>) -> bool {
|
||||
if req.method() != Method::GET {
|
||||
return false;
|
||||
}
|
||||
|
||||
let path = req.uri().path();
|
||||
if path.starts_with(ADMIN_PREFIX)
|
||||
|| path.starts_with(RUSTFS_ADMIN_PREFIX)
|
||||
|| path.starts_with(CONSOLE_PREFIX)
|
||||
|| path.starts_with(RPC_PREFIX)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
let has_object_attributes_query = req.uri().query().is_some_and(|query| {
|
||||
query.split('&').any(|part| {
|
||||
let (name, _value) = part.split_once('=').unwrap_or((part, ""));
|
||||
matches!(
|
||||
name.to_ascii_lowercase().as_str(),
|
||||
"attributes" | "object-attributes" | "x-amz-object-attributes"
|
||||
)
|
||||
})
|
||||
});
|
||||
let has_object_attributes_header = req
|
||||
.headers()
|
||||
.get(http::header::HeaderName::from_static("x-amz-object-attributes"))
|
||||
.is_some();
|
||||
|
||||
has_object_attributes_query || has_object_attributes_header
|
||||
}
|
||||
|
||||
/// Conditional CORS layer that only applies to S3 API requests
|
||||
/// (not Admin, not Console, not RPC)
|
||||
#[derive(Clone)]
|
||||
@@ -267,3 +416,54 @@ where
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use http_body_util::BodyExt;
|
||||
use http_body_util::Full;
|
||||
|
||||
#[test]
|
||||
fn test_strip_quotes_from_first_etag_removes_quotes() {
|
||||
let input = String::from("<GetObjectAttributesOutput><ETag>\"abc\"</ETag></GetObjectAttributesOutput>");
|
||||
let output = strip_quotes_from_first_etag(input);
|
||||
|
||||
assert_eq!(output, "<GetObjectAttributesOutput><ETag>abc</ETag></GetObjectAttributesOutput>");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_strip_quotes_from_first_etag_keeps_non_quoted_value() {
|
||||
let input = String::from("<GetObjectAttributesOutput><ETag>abc</ETag></GetObjectAttributesOutput>");
|
||||
let output = strip_quotes_from_first_etag(input.clone());
|
||||
|
||||
assert_eq!(output, input);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_strip_quotes_from_first_etag_only_first_occurrence() {
|
||||
let input =
|
||||
String::from("<GetObjectAttributesOutput><ETag>\"first\"</ETag><ETag>\"second\"</ETag></GetObjectAttributesOutput>");
|
||||
let output = strip_quotes_from_first_etag(input);
|
||||
|
||||
assert_eq!(
|
||||
output,
|
||||
"<GetObjectAttributesOutput><ETag>first</ETag><ETag>\"second\"</ETag></GetObjectAttributesOutput>"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fix_object_attributes_etag_in_xml() {
|
||||
let body = Full::from(Bytes::from(
|
||||
"<GetObjectAttributesOutput><ETag>\"abc\"</ETag><Checksum>CRC32C</Checksum></GetObjectAttributesOutput>",
|
||||
));
|
||||
let fixed = fix_object_attributes_etag_in_xml(body).await.unwrap();
|
||||
let bytes = BodyExt::collect(fixed).await.unwrap().to_bytes();
|
||||
|
||||
assert_eq!(
|
||||
bytes,
|
||||
Bytes::from_static(
|
||||
b"<GetObjectAttributesOutput><ETag>abc</ETag><Checksum>CRC32C</Checksum></GetObjectAttributesOutput>",
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
# - SSE-C: Server-side encryption with customer-provided keys
|
||||
# - Object ownership: Bucket ownership controls
|
||||
#
|
||||
# Total: 159 tests
|
||||
# Total: 174 tests
|
||||
|
||||
test_basic_key_count
|
||||
test_bucket_create_naming_bad_short_one
|
||||
@@ -153,6 +153,9 @@ test_upload_part_copy_percent_encoded_key
|
||||
test_api_error_from_storage_error_mappings
|
||||
test_get_object_torrent
|
||||
|
||||
# Object attributes
|
||||
test_get_checksum_object_attributes
|
||||
|
||||
# SSE-C encryption tests
|
||||
test_encryption_sse_c_method_head
|
||||
test_encryption_sse_c_present
|
||||
@@ -200,3 +203,12 @@ test_object_put_acl_mtime
|
||||
|
||||
# Object ownership
|
||||
test_create_bucket_no_ownership_controls
|
||||
|
||||
# Bucket encryption
|
||||
test_put_bucket_encryption_kms
|
||||
test_put_bucket_encryption_s3
|
||||
test_get_bucket_encryption_kms
|
||||
test_get_bucket_encryption_s3
|
||||
test_delete_bucket_encryption_kms
|
||||
test_delete_bucket_encryption_s3
|
||||
test_lifecycle_expiration_days0
|
||||
|
||||
@@ -25,18 +25,13 @@ test_bucket_policy_put_obj_kms_s3
|
||||
test_bucket_policy_put_obj_s3_kms
|
||||
test_copy_enc
|
||||
test_copy_part_enc
|
||||
test_delete_bucket_encryption_kms
|
||||
test_delete_bucket_encryption_s3
|
||||
test_encryption_key_no_sse_c
|
||||
test_encryption_sse_c_invalid_md5
|
||||
test_encryption_sse_c_multipart_bad_download
|
||||
test_encryption_sse_c_no_key
|
||||
test_encryption_sse_c_no_md5
|
||||
test_get_bucket_encryption_kms
|
||||
test_get_bucket_encryption_s3
|
||||
test_get_versioned_object_attributes
|
||||
test_lifecycle_delete
|
||||
test_lifecycle_expiration_days0
|
||||
test_lifecycle_expiration_header_put
|
||||
test_lifecycle_get
|
||||
test_lifecycle_get_no_id
|
||||
@@ -65,8 +60,6 @@ test_object_lock_put_obj_lock_enable_after_create
|
||||
test_object_lock_put_obj_lock_invalid_bucket
|
||||
test_object_lock_put_obj_retention_invalid_bucket
|
||||
test_post_object_upload_checksum
|
||||
test_put_bucket_encryption_kms
|
||||
test_put_bucket_encryption_s3
|
||||
test_put_bucket_logging
|
||||
test_put_bucket_logging_errors
|
||||
test_put_bucket_logging_permissions
|
||||
@@ -116,6 +109,3 @@ test_bucket_policy_multipart
|
||||
test_bucket_policy_put_obj_grant
|
||||
test_bucket_policy_tenanted_bucket
|
||||
test_object_presigned_put_object_with_acl_tenant
|
||||
|
||||
# Object attributes
|
||||
test_get_checksum_object_attributes
|
||||
|
||||
Reference in New Issue
Block a user