fix(ecstore): preserve transition object metadata (#2263)

Co-authored-by: houseme <housemecn@gmail.com>
Co-authored-by: heihutu <heihutu@gmail.com>
This commit is contained in:
cxymds
2026-03-24 14:06:29 +08:00
committed by GitHub
parent 75e6902f46
commit 5ea6d8a7e6
11 changed files with 287 additions and 128 deletions

View File

@@ -82,6 +82,9 @@ use rustfs_rio::{EtagResolvable, HashReader, HashReaderMut, TryGetIndex as _, Wa
use rustfs_s3_common::EventName;
use rustfs_utils::http::headers::AMZ_OBJECT_TAGGING;
use rustfs_utils::http::headers::AMZ_STORAGE_CLASS;
use rustfs_utils::http::headers::{
CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE, CONTENT_TYPE, EXPIRES, HeaderExt as _,
};
use rustfs_utils::http::{
SUFFIX_ACTUAL_OBJECT_SIZE_CAP, SUFFIX_ACTUAL_SIZE, SUFFIX_COMPRESSION, SUFFIX_COMPRESSION_SIZE, SUFFIX_REPLICATION_SSEC_CRC,
contains_key_str, get_header_map, get_str, insert_str, remove_header_map,
@@ -92,7 +95,7 @@ use rustfs_utils::{
path::{SLASH_SEPARATOR, encode_dir_object, has_suffix, path_join_buf},
};
use rustfs_workers::workers::Workers;
use s3s::header::X_AMZ_RESTORE;
use s3s::header::{X_AMZ_OBJECT_LOCK_LEGAL_HOLD, X_AMZ_OBJECT_LOCK_MODE, X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE, X_AMZ_RESTORE};
use sha2::{Digest, Sha256};
use std::hash::Hash;
use std::mem::{self};
@@ -1803,6 +1806,27 @@ impl ObjectOperations for SetDisks {
let dest_obj = dest_obj.unwrap();
let oi = ObjectInfo::from_file_info(&fi, bucket, object, opts.versioned || opts.version_suspended);
let mut transition_meta = oi.user_defined.clone();
transition_meta.insert("name".to_string(), object.to_string());
if let Some(content_type) = oi.content_type.as_ref().filter(|value| !value.is_empty()) {
transition_meta.insert(CONTENT_TYPE.to_ascii_lowercase(), content_type.clone());
}
for header in [
CONTENT_ENCODING,
CONTENT_LANGUAGE,
CONTENT_DISPOSITION,
CACHE_CONTROL,
EXPIRES,
X_AMZ_OBJECT_LOCK_MODE.as_str(),
X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE.as_str(),
X_AMZ_OBJECT_LOCK_LEGAL_HOLD.as_str(),
] {
if let Some(value) = fi.metadata.lookup(header).filter(|value| !value.is_empty()) {
transition_meta.insert(header.to_ascii_lowercase(), value.to_string());
}
}
let (pr, mut pw) = tokio::io::duplex(fi.erasure.block_size);
let reader = ReaderImpl::ObjectBody(GetObjectReader {
@@ -1836,13 +1860,7 @@ impl ObjectOperations for SetDisks {
};
});
let rv = tgt_client
.put_with_meta(&dest_obj, reader, fi.size, {
let mut m = HashMap::<String, String>::new();
m.insert("name".to_string(), object.to_string());
m
})
.await;
let rv = tgt_client.put_with_meta(&dest_obj, reader, fi.size, transition_meta).await;
if let Err(err) = rv {
return Err(StorageError::Io(err));
}

View File

@@ -20,6 +20,7 @@
use crate::client::{
admin_handler_utils::AdminError,
api_put_object::{AdvancedPutOptions, PutObjectOptions},
transition_api::{ReadCloser, ReaderImpl},
};
use crate::error::is_err_bucket_not_found;
@@ -39,7 +40,17 @@ use crate::tier::{
};
use bytes::Bytes;
use http::StatusCode;
use rustfs_utils::http::headers::{
CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE, CONTENT_TYPE, EXPIRES, HeaderExt as _,
};
use s3s::dto::{ObjectLockLegalHoldStatus, ObjectLockRetentionMode, ReplicationStatus};
use s3s::header::{
X_AMZ_OBJECT_LOCK_LEGAL_HOLD, X_AMZ_OBJECT_LOCK_MODE, X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE, X_AMZ_REPLICATION_STATUS,
X_AMZ_STORAGE_CLASS,
};
use std::collections::HashMap;
use time::OffsetDateTime;
use time::format_description::well_known::{Rfc2822, Rfc3339};
use tracing::{info, warn};
pub type WarmBackendImpl = Box<dyn WarmBackend + Send + Sync + 'static>;
@@ -67,6 +78,82 @@ pub trait WarmBackend {
async fn in_use(&self) -> Result<bool, std::io::Error>;
}
fn parse_http_timestamp(value: &str) -> Option<OffsetDateTime> {
OffsetDateTime::parse(value, &Rfc3339)
.or_else(|_| OffsetDateTime::parse(value, &Rfc2822))
.ok()
}
pub fn build_transition_put_options(storage_class: String, mut metadata: HashMap<String, String>) -> PutObjectOptions {
let mut opts = PutObjectOptions {
storage_class,
legalhold: ObjectLockLegalHoldStatus::from_static(""),
internal: AdvancedPutOptions {
replication_status: ReplicationStatus::from_static(""),
..Default::default()
},
..Default::default()
};
if let Some(content_type) = metadata.lookup(CONTENT_TYPE) {
opts.content_type = content_type.to_string();
}
if let Some(content_encoding) = metadata.lookup(CONTENT_ENCODING) {
opts.content_encoding = content_encoding.to_string();
}
if let Some(content_language) = metadata.lookup(CONTENT_LANGUAGE) {
opts.content_language = content_language.to_string();
}
if let Some(content_disposition) = metadata.lookup(CONTENT_DISPOSITION) {
opts.content_disposition = content_disposition.to_string();
}
if let Some(cache_control) = metadata.lookup(CACHE_CONTROL) {
opts.cache_control = cache_control.to_string();
}
if let Some(expires) = metadata.lookup(EXPIRES).and_then(parse_http_timestamp) {
opts.expires = expires;
}
if let Some(mode) = metadata.lookup(X_AMZ_OBJECT_LOCK_MODE.as_str()) {
opts.mode = ObjectLockRetentionMode::from(mode.to_ascii_uppercase());
}
if let Some(retain_until_date) = metadata
.lookup(X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE.as_str())
.and_then(parse_http_timestamp)
{
opts.retain_until_date = retain_until_date;
}
if let Some(legalhold) = metadata.lookup(X_AMZ_OBJECT_LOCK_LEGAL_HOLD.as_str()) {
opts.legalhold = ObjectLockLegalHoldStatus::from(legalhold.to_ascii_uppercase());
}
for key in [
CONTENT_TYPE,
CONTENT_ENCODING,
CONTENT_LANGUAGE,
CONTENT_DISPOSITION,
CACHE_CONTROL,
EXPIRES,
X_AMZ_OBJECT_LOCK_MODE.as_str(),
X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE.as_str(),
X_AMZ_OBJECT_LOCK_LEGAL_HOLD.as_str(),
X_AMZ_REPLICATION_STATUS.as_str(),
X_AMZ_STORAGE_CLASS.as_str(),
] {
metadata.remove(key);
}
opts.user_metadata = metadata;
opts
}
pub async fn check_warm_backend(w: Option<&WarmBackendImpl>) -> Result<(), AdminError> {
let w = w.expect("err");
let remote_version_id = w
@@ -213,3 +300,54 @@ pub async fn new_warm_backend(tier: &TierConfig, probe: bool) -> Result<WarmBack
Ok(d.expect("err"))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn build_transition_put_options_preserves_content_headers() {
let mut metadata = HashMap::new();
metadata.insert("content-type".to_string(), "text/plain".to_string());
metadata.insert("content-encoding".to_string(), "gzip".to_string());
metadata.insert("cache-control".to_string(), "max-age=60".to_string());
let opts = build_transition_put_options("COLD".to_string(), metadata);
assert_eq!(opts.content_type, "text/plain");
assert_eq!(opts.content_encoding, "gzip");
assert_eq!(opts.cache_control, "max-age=60");
assert_eq!(opts.internal.replication_status.as_str(), "");
assert_eq!(opts.legalhold.as_str(), "");
}
#[test]
fn build_transition_put_options_preserves_object_lock_headers_when_present() {
let mut metadata = HashMap::new();
metadata.insert(X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE.to_string(), "2026-03-23T00:00:00Z".to_string());
metadata.insert(X_AMZ_OBJECT_LOCK_LEGAL_HOLD.to_string(), ObjectLockLegalHoldStatus::ON.to_string());
metadata.insert(X_AMZ_OBJECT_LOCK_MODE.to_string(), ObjectLockRetentionMode::GOVERNANCE.to_string());
let opts = build_transition_put_options("COLD".to_string(), metadata);
assert_eq!(opts.mode.as_str(), ObjectLockRetentionMode::GOVERNANCE);
assert_eq!(opts.legalhold.as_str(), ObjectLockLegalHoldStatus::ON);
assert_ne!(opts.retain_until_date, OffsetDateTime::UNIX_EPOCH);
}
#[test]
fn build_transition_put_options_filters_promoted_headers_from_user_metadata() {
let mut metadata = HashMap::new();
metadata.insert("name".to_string(), "object".to_string());
metadata.insert(CONTENT_TYPE.to_string(), "text/plain".to_string());
metadata.insert(X_AMZ_OBJECT_LOCK_LEGAL_HOLD.to_string(), ObjectLockLegalHoldStatus::ON.to_string());
metadata.insert(X_AMZ_REPLICATION_STATUS.to_string(), "PENDING".to_string());
let opts = build_transition_put_options("COLD".to_string(), metadata);
assert_eq!(opts.user_metadata.get("name"), Some(&"object".to_string()));
assert!(!opts.user_metadata.contains_key(CONTENT_TYPE));
assert!(!opts.user_metadata.contains_key(X_AMZ_OBJECT_LOCK_LEGAL_HOLD.as_str()));
assert!(!opts.user_metadata.contains_key(X_AMZ_REPLICATION_STATUS.as_str()));
}
}

View File

@@ -29,7 +29,7 @@ use crate::client::{
};
use crate::tier::{
tier_config::TierAliyun,
warm_backend::{WarmBackend, WarmBackendGetOpts},
warm_backend::{WarmBackend, WarmBackendGetOpts, build_transition_put_options},
warm_backend_s3::WarmBackendS3,
};
use tracing::warn;
@@ -107,19 +107,12 @@ impl WarmBackend for WarmBackendAliyun {
let part_size = optimal_part_size(length)?;
let client = self.0.client.clone();
let res = client
.put_object(
&self.0.bucket,
&self.0.get_dest(object),
r,
length,
&PutObjectOptions {
storage_class: self.0.storage_class.clone(),
part_size: part_size as u64,
disable_content_sha256: true,
user_metadata: meta,
..Default::default()
},
)
.put_object(&self.0.bucket, &self.0.get_dest(object), r, length, &{
let mut opts = build_transition_put_options(self.0.storage_class.clone(), meta);
opts.part_size = part_size as u64;
opts.disable_content_sha256 = true;
opts
})
.await?;
//self.ToObjectError(err, object)
Ok(res.version_id)

View File

@@ -29,7 +29,7 @@ use crate::client::{
};
use crate::tier::{
tier_config::TierAzure,
warm_backend::{WarmBackend, WarmBackendGetOpts},
warm_backend::{WarmBackend, WarmBackendGetOpts, build_transition_put_options},
warm_backend_s3::WarmBackendS3,
};
use tracing::warn;
@@ -107,19 +107,12 @@ impl WarmBackend for WarmBackendAzure {
let part_size = optimal_part_size(length)?;
let client = self.0.client.clone();
let res = client
.put_object(
&self.0.bucket,
&self.0.get_dest(object),
r,
length,
&PutObjectOptions {
storage_class: self.0.storage_class.clone(),
part_size: part_size as u64,
disable_content_sha256: true,
user_metadata: meta,
..Default::default()
},
)
.put_object(&self.0.bucket, &self.0.get_dest(object), r, length, &{
let mut opts = build_transition_put_options(self.0.storage_class.clone(), meta);
opts.part_size = part_size as u64;
opts.disable_content_sha256 = true;
opts
})
.await?;
//self.ToObjectError(err, object)
Ok(res.version_id)

View File

@@ -29,7 +29,7 @@ use crate::client::{
};
use crate::tier::{
tier_config::TierHuaweicloud,
warm_backend::{WarmBackend, WarmBackendGetOpts},
warm_backend::{WarmBackend, WarmBackendGetOpts, build_transition_put_options},
warm_backend_s3::WarmBackendS3,
};
use tracing::warn;
@@ -107,19 +107,12 @@ impl WarmBackend for WarmBackendHuaweicloud {
let part_size = optimal_part_size(length)?;
let client = self.0.client.clone();
let res = client
.put_object(
&self.0.bucket,
&self.0.get_dest(object),
r,
length,
&PutObjectOptions {
storage_class: self.0.storage_class.clone(),
part_size: part_size as u64,
disable_content_sha256: true,
user_metadata: meta,
..Default::default()
},
)
.put_object(&self.0.bucket, &self.0.get_dest(object), r, length, &{
let mut opts = build_transition_put_options(self.0.storage_class.clone(), meta);
opts.part_size = part_size as u64;
opts.disable_content_sha256 = true;
opts
})
.await?;
//self.ToObjectError(err, object)
Ok(res.version_id)

View File

@@ -29,7 +29,7 @@ use crate::client::{
};
use crate::tier::{
tier_config::TierMinIO,
warm_backend::{WarmBackend, WarmBackendGetOpts},
warm_backend::{WarmBackend, WarmBackendGetOpts, build_transition_put_options},
warm_backend_s3::WarmBackendS3,
};
use tracing::warn;
@@ -106,19 +106,12 @@ impl WarmBackend for WarmBackendMinIO {
let part_size = optimal_part_size(length)?;
let client = self.0.client.clone();
let res = client
.put_object(
&self.0.bucket,
&self.0.get_dest(object),
r,
length,
&PutObjectOptions {
storage_class: self.0.storage_class.clone(),
part_size: part_size as u64,
disable_content_sha256: true,
user_metadata: meta,
..Default::default()
},
)
.put_object(&self.0.bucket, &self.0.get_dest(object), r, length, &{
let mut opts = build_transition_put_options(self.0.storage_class.clone(), meta);
opts.part_size = part_size as u64;
opts.disable_content_sha256 = true;
opts
})
.await?;
//self.ToObjectError(err, object)
Ok(res.version_id)

View File

@@ -29,7 +29,7 @@ use crate::client::{
};
use crate::tier::{
tier_config::TierR2,
warm_backend::{WarmBackend, WarmBackendGetOpts},
warm_backend::{WarmBackend, WarmBackendGetOpts, build_transition_put_options},
warm_backend_s3::WarmBackendS3,
};
use tracing::warn;
@@ -106,19 +106,12 @@ impl WarmBackend for WarmBackendR2 {
let part_size = optimal_part_size(length)?;
let client = self.0.client.clone();
let res = client
.put_object(
&self.0.bucket,
&self.0.get_dest(object),
r,
length,
&PutObjectOptions {
storage_class: self.0.storage_class.clone(),
part_size: part_size as u64,
disable_content_sha256: true,
user_metadata: meta,
..Default::default()
},
)
.put_object(&self.0.bucket, &self.0.get_dest(object), r, length, &{
let mut opts = build_transition_put_options(self.0.storage_class.clone(), meta);
opts.part_size = part_size as u64;
opts.disable_content_sha256 = true;
opts
})
.await?;
//self.ToObjectError(err, object)
Ok(res.version_id)

View File

@@ -29,7 +29,7 @@ use crate::client::{
};
use crate::tier::{
tier_config::TierRustFS,
warm_backend::{WarmBackend, WarmBackendGetOpts},
warm_backend::{WarmBackend, WarmBackendGetOpts, build_transition_put_options},
warm_backend_s3::WarmBackendS3,
};
@@ -101,19 +101,12 @@ impl WarmBackend for WarmBackendRustFS {
let part_size = optimal_part_size(length)?;
let client = self.0.client.clone();
let res = client
.put_object(
&self.0.bucket,
&self.0.get_dest(object),
r,
length,
&PutObjectOptions {
storage_class: self.0.storage_class.clone(),
part_size: part_size as u64,
disable_content_sha256: true,
user_metadata: meta,
..Default::default()
},
)
.put_object(&self.0.bucket, &self.0.get_dest(object), r, length, &{
let mut opts = build_transition_put_options(self.0.storage_class.clone(), meta);
opts.part_size = part_size as u64;
opts.disable_content_sha256 = true;
opts
})
.await?;
//self.ToObjectError(err, object)
Ok(res.version_id)

View File

@@ -34,7 +34,7 @@ use crate::error::ErrorResponse;
use crate::error::error_resp_to_object_err;
use crate::tier::{
tier_config::TierS3,
warm_backend::{WarmBackend, WarmBackendGetOpts},
warm_backend::{WarmBackend, WarmBackendGetOpts, build_transition_put_options},
};
use rustfs_utils::path::SLASH_SEPARATOR;
@@ -128,18 +128,11 @@ impl WarmBackend for WarmBackendS3 {
) -> Result<String, std::io::Error> {
let client = self.client.clone();
let res = client
.put_object(
&self.bucket,
&self.get_dest(object),
r,
length,
&PutObjectOptions {
send_content_md5: true,
storage_class: self.storage_class.clone(),
user_metadata: meta,
..Default::default()
},
)
.put_object(&self.bucket, &self.get_dest(object), r, length, &{
let mut opts = build_transition_put_options(self.storage_class.clone(), meta);
opts.send_content_md5 = true;
opts
})
.await?;
Ok(res.version_id)
}

View File

@@ -29,7 +29,7 @@ use crate::client::{
};
use crate::tier::{
tier_config::TierTencent,
warm_backend::{WarmBackend, WarmBackendGetOpts},
warm_backend::{WarmBackend, WarmBackendGetOpts, build_transition_put_options},
warm_backend_s3::WarmBackendS3,
};
use tracing::warn;
@@ -107,19 +107,12 @@ impl WarmBackend for WarmBackendTencent {
let part_size = optimal_part_size(length)?;
let client = self.0.client.clone();
let res = client
.put_object(
&self.0.bucket,
&self.0.get_dest(object),
r,
length,
&PutObjectOptions {
storage_class: self.0.storage_class.clone(),
part_size: part_size as u64,
disable_content_sha256: true,
user_metadata: meta,
..Default::default()
},
)
.put_object(&self.0.bucket, &self.0.get_dest(object), r, length, &{
let mut opts = build_transition_put_options(self.0.storage_class.clone(), meta);
opts.part_size = part_size as u64;
opts.disable_content_sha256 = true;
opts
})
.await?;
//self.ToObjectError(err, object)
Ok(res.version_id)

View File

@@ -26,7 +26,7 @@ use rustfs_ecstore::{
},
tier::{
tier_config::{TierConfig, TierMinIO, TierType},
warm_backend::{WarmBackend, WarmBackendGetOpts},
warm_backend::{WarmBackend, WarmBackendGetOpts, build_transition_put_options},
},
};
use rustfs_scanner::scanner::init_data_scanner;
@@ -374,14 +374,23 @@ async fn wait_for_object_absence(ecstore: &Arc<ECStore>, bucket: &str, object: &
}
}
#[derive(Clone, Default)]
struct MockStoredObject {
bytes: Vec<u8>,
metadata: HashMap<String, String>,
}
#[derive(Clone, Default)]
struct MockWarmBackend {
objects: Arc<Mutex<HashMap<String, Vec<u8>>>>,
objects: Arc<Mutex<HashMap<String, MockStoredObject>>>,
}
impl MockWarmBackend {
async fn put_bytes(&self, object: &str, bytes: Vec<u8>) -> String {
self.objects.lock().await.insert(object.to_string(), bytes);
async fn put_bytes(&self, object: &str, bytes: Vec<u8>, metadata: HashMap<String, String>) -> String {
self.objects
.lock()
.await
.insert(object.to_string(), MockStoredObject { bytes, metadata });
Uuid::new_v4().to_string()
}
@@ -401,7 +410,7 @@ impl MockWarmBackend {
impl WarmBackend for MockWarmBackend {
async fn put(&self, object: &str, r: ReaderImpl, _length: i64) -> Result<String, std::io::Error> {
let bytes = self.read_bytes(r).await?;
Ok(self.put_bytes(object, bytes).await)
Ok(self.put_bytes(object, bytes, HashMap::new()).await)
}
async fn put_with_meta(
@@ -409,17 +418,38 @@ impl WarmBackend for MockWarmBackend {
object: &str,
r: ReaderImpl,
_length: i64,
_meta: HashMap<String, String>,
meta: HashMap<String, String>,
) -> Result<String, std::io::Error> {
let bytes = self.read_bytes(r).await?;
Ok(self.put_bytes(object, bytes).await)
let opts = build_transition_put_options(String::new(), meta);
let mut metadata = opts.user_metadata.clone();
if !opts.content_type.is_empty() {
metadata.insert("content-type".to_string(), opts.content_type.clone());
}
if !opts.content_encoding.is_empty() {
metadata.insert("content-encoding".to_string(), opts.content_encoding.clone());
}
if !opts.cache_control.is_empty() {
metadata.insert("cache-control".to_string(), opts.cache_control.clone());
}
if !opts.internal.replication_status.as_str().is_empty() {
metadata.insert(
"x-amz-replication-status".to_string(),
opts.internal.replication_status.as_str().to_string(),
);
}
if !opts.legalhold.as_str().is_empty() {
metadata.insert("x-amz-object-lock-legal-hold".to_string(), opts.legalhold.as_str().to_string());
}
Ok(self.put_bytes(object, bytes, metadata).await)
}
async fn get(&self, object: &str, _rv: &str, opts: WarmBackendGetOpts) -> Result<ReadCloser, std::io::Error> {
let objects = self.objects.lock().await;
let Some(bytes) = objects.get(object) else {
let Some(stored) = objects.get(object) else {
return Err(std::io::Error::new(std::io::ErrorKind::NotFound, "mock object not found"));
};
let bytes = &stored.bytes;
let start = opts.start_offset.max(0) as usize;
let end = if opts.length > 0 {
@@ -593,7 +623,21 @@ mod serial_tests {
.await
.expect("Failed to set lifecycle configuration");
upload_test_object(&ecstore, put_bucket.as_str(), put_object, put_payload).await;
let mut reader = PutObjReader::from_vec(put_payload.to_vec());
let mut metadata = HashMap::new();
metadata.insert("content-type".to_string(), "text/plain".to_string());
ecstore
.put_object(
put_bucket.as_str(),
put_object,
&mut reader,
&ObjectOptions {
user_defined: metadata,
..Default::default()
},
)
.await
.expect("Failed to upload transition metadata test object");
enqueue_transition_for_existing_objects(ecstore.clone(), put_bucket.as_str())
.await
@@ -606,6 +650,21 @@ mod serial_tests {
assert_eq!(put_info.transitioned_object.status, "complete");
assert_eq!(put_info.transitioned_object.tier, tier_name);
assert!(backend.objects.lock().await.contains_key(&put_info.transitioned_object.name));
{
let stored = backend.objects.lock().await;
let transitioned = stored
.get(&put_info.transitioned_object.name)
.expect("transitioned object should be present in mock backend");
assert_eq!(transitioned.metadata.get("content-type"), Some(&"text/plain".to_string()));
assert!(
!transitioned.metadata.contains_key("x-amz-replication-status"),
"transitioned objects must not inherit replication status defaults"
);
assert!(
!transitioned.metadata.contains_key("x-amz-object-lock-legal-hold"),
"transitioned objects must not invent object lock headers"
);
}
let multipart_bucket = format!("test-immediate-mpu-{}", &Uuid::new_v4().simple().to_string()[..8]);
let multipart_object = "test/multipart.txt";