refactor(app): inline multipart transition helper (#2451)

This commit is contained in:
安正超
2026-04-10 07:06:50 +08:00
committed by GitHub
parent 28d9bbbdcb
commit b8bfaccbc5

View File

@@ -68,14 +68,10 @@ use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio_util::io::StreamReader;
use tracing::{info, instrument, warn};
use tracing::{instrument, warn};
use urlencoding::encode;
use uuid::Uuid;
async fn maybe_enqueue_transition_immediate(obj_info: &rustfs_ecstore::store_api::ObjectInfo, src: LcEventSrc) {
enqueue_transition_immediate(obj_info, src).await;
}
/// Returns InvalidRange error if CopySourceRange end exceeds the source object size.
/// Used by execute_upload_part_copy to reject out-of-bounds ranges per S3 spec.
fn validate_copy_source_range_not_exceeds(range_spec: &HTTPRangeSpec, object_size: i64) -> S3Result<()> {
@@ -327,30 +323,15 @@ impl DefaultMultipartUsecase {
}
}
// TDD: Get multipart info to extract encryption configuration before completing
info!(
"TDD: Attempting to get multipart info for bucket={}, key={}, upload_id={}",
bucket, key, upload_id
);
let multipart_info = store
.get_multipart_info(&bucket, &key, &upload_id, &ObjectOptions::default())
.await
.map_err(ApiError::from)?;
info!("TDD: Got multipart info successfully");
info!("TDD: Multipart info metadata: {:?}", multipart_info.user_defined);
// TDD: Extract encryption information from multipart upload metadata
let server_side_encryption = multipart_info
.user_defined
.get("x-amz-server-side-encryption")
.map(|s| ServerSideEncryption::from(s.clone()));
info!(
"TDD: Raw encryption from metadata: {:?} -> parsed: {:?}",
multipart_info.user_defined.get("x-amz-server-side-encryption"),
server_side_encryption
);
let ssekms_key_id = match server_side_encryption.as_ref() {
Some(sse) if sse.as_str() == ServerSideEncryption::AWS_KMS => multipart_info
@@ -360,11 +341,6 @@ impl DefaultMultipartUsecase {
_ => None,
};
info!(
"TDD: Extracted encryption info - SSE: {:?}, KMS Key: {:?}",
server_side_encryption, ssekms_key_id
);
let obj_info = store
.clone()
.complete_multipart_upload(&bucket, &key, &upload_id, uploaded_parts, &opts)
@@ -402,7 +378,7 @@ impl DefaultMultipartUsecase {
}
}
maybe_enqueue_transition_immediate(&obj_info, LcEventSrc::S3CompleteMultipartUpload).await;
enqueue_transition_immediate(&obj_info, LcEventSrc::S3CompleteMultipartUpload).await;
let raw_mpu_version = obj_info.version_id.map(|v| v.to_string());
let mpu_version = if BucketVersioningSys::prefix_enabled(&bucket, &key).await {
@@ -411,11 +387,6 @@ impl DefaultMultipartUsecase {
None
};
let mpu_version_for_event = mpu_version.clone();
info!(
"TDD: Creating output with SSE: {:?}, KMS Key: {:?}",
server_side_encryption, ssekms_key_id
);
let mut checksum_crc32 = input.checksum_crc32;
let mut checksum_crc32c = input.checksum_crc32c;
let mut checksum_sha1 = input.checksum_sha1;
@@ -476,11 +447,6 @@ impl DefaultMultipartUsecase {
checksum_type,
..Default::default()
};
info!(
"TDD: Created output: SSE={:?}, KMS={:?}",
output.server_side_encryption, output.ssekms_key_id
);
let mt2 = HashMap::new();
let replicate_options =
get_must_replicate_options(&mt2, "".to_string(), ReplicationStatusType::Empty, ReplicationType::Object, opts.clone());
@@ -491,10 +457,6 @@ impl DefaultMultipartUsecase {
warn!("need multipart replication");
schedule_replication(obj_info.clone(), store, dsc, ReplicationType::Object).await;
}
info!(
"TDD: About to return S3Response with output: SSE={:?}, KMS={:?}",
output.server_side_encryption, output.ssekms_key_id
);
// Set object info for event notification
helper = helper.object(obj_info);