mirror of
https://github.com/rustfs/rustfs.git
synced 2026-05-06 22:28:16 +08:00
fix(ecstore): refine lifecycle transition enqueue
This commit is contained in:
@@ -463,17 +463,41 @@ impl TransitionState {
|
||||
src: src.clone(),
|
||||
event: event.clone(),
|
||||
};
|
||||
select! {
|
||||
//_ -> t.ctx.Done() => (),
|
||||
_ = self.transition_tx.send(Some(task)) => (),
|
||||
else => {
|
||||
match src {
|
||||
LcEventSrc::S3PutObject | LcEventSrc::S3CopyObject | LcEventSrc::S3CompleteMultipartUpload => {
|
||||
self.missed_immediate_tasks.fetch_add(1, Ordering::SeqCst);
|
||||
}
|
||||
_ => ()
|
||||
if let Err(err) = self.transition_tx.send(Some(task)).await {
|
||||
warn!(
|
||||
bucket = %oi.bucket,
|
||||
object = %oi.name,
|
||||
source = ?src,
|
||||
error = ?err,
|
||||
"failed to queue transition task"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn try_queue_transition_task(&self, oi: &ObjectInfo, event: &lifecycle::Event, src: &LcEventSrc) -> bool {
|
||||
let task = TransitionTask {
|
||||
obj_info: oi.clone(),
|
||||
src: src.clone(),
|
||||
event: event.clone(),
|
||||
};
|
||||
match self.transition_tx.try_send(Some(task)) {
|
||||
Ok(()) => true,
|
||||
Err(err) => {
|
||||
if matches!(
|
||||
src,
|
||||
LcEventSrc::S3PutObject | LcEventSrc::S3CopyObject | LcEventSrc::S3CompleteMultipartUpload
|
||||
) {
|
||||
self.missed_immediate_tasks.fetch_add(1, Ordering::SeqCst);
|
||||
}
|
||||
},
|
||||
warn!(
|
||||
bucket = %oi.bucket,
|
||||
object = %oi.name,
|
||||
source = ?src,
|
||||
error = ?err,
|
||||
"failed to queue transition task without waiting"
|
||||
);
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -647,23 +671,48 @@ pub async fn validate_transition_tier(lc: &BucketLifecycleConfiguration) -> Resu
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn enqueue_transition_immediate(oi: &ObjectInfo, src: LcEventSrc) {
|
||||
let lc = GLOBAL_LifecycleSys.get(&oi.bucket).await;
|
||||
if !lc.is_none() {
|
||||
let event = lc.expect("err").eval(&oi.to_lifecycle_opts()).await;
|
||||
match event.action {
|
||||
IlmAction::TransitionAction | IlmAction::TransitionVersionAction => {
|
||||
if oi.delete_marker || oi.is_dir {
|
||||
return;
|
||||
}
|
||||
GLOBAL_TransitionState.queue_transition_task(oi, &event, &src).await;
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
async fn transition_event_for_object(lc: &BucketLifecycleConfiguration, oi: &ObjectInfo) -> Option<lifecycle::Event> {
|
||||
if oi.delete_marker || oi.is_dir {
|
||||
return None;
|
||||
}
|
||||
|
||||
let event = lc.eval(&oi.to_lifecycle_opts()).await;
|
||||
match event.action {
|
||||
IlmAction::TransitionAction | IlmAction::TransitionVersionAction => Some(event),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
async fn enqueue_transition_with_config(lc: &BucketLifecycleConfiguration, oi: &ObjectInfo, src: &LcEventSrc) {
|
||||
if let Some(event) = transition_event_for_object(lc, oi).await {
|
||||
GLOBAL_TransitionState.queue_transition_task(oi, &event, src).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn try_enqueue_transition_with_config(lc: &BucketLifecycleConfiguration, oi: &ObjectInfo, src: &LcEventSrc) {
|
||||
if let Some(event) = transition_event_for_object(lc, oi).await {
|
||||
let _ = GLOBAL_TransitionState.try_queue_transition_task(oi, &event, src);
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn enqueue_transition_immediate(oi: &ObjectInfo, src: LcEventSrc) {
|
||||
let Some(lc) = GLOBAL_LifecycleSys.get(&oi.bucket).await else {
|
||||
return;
|
||||
};
|
||||
enqueue_transition_with_config(&lc, oi, &src).await;
|
||||
}
|
||||
|
||||
pub async fn try_enqueue_transition_immediate(oi: &ObjectInfo, src: LcEventSrc) {
|
||||
let Some(lc) = GLOBAL_LifecycleSys.get(&oi.bucket).await else {
|
||||
return;
|
||||
};
|
||||
try_enqueue_transition_with_config(&lc, oi, &src).await;
|
||||
}
|
||||
|
||||
pub async fn enqueue_transition_for_existing_objects(api: Arc<ECStore>, bucket: &str) -> Result<(), Error> {
|
||||
let Some(lc) = GLOBAL_LifecycleSys.get(bucket).await else {
|
||||
return Ok(());
|
||||
};
|
||||
let mut marker = None;
|
||||
let mut version_marker = None;
|
||||
|
||||
@@ -674,7 +723,7 @@ pub async fn enqueue_transition_for_existing_objects(api: Arc<ECStore>, bucket:
|
||||
.await?;
|
||||
|
||||
for object in &page.objects {
|
||||
enqueue_transition_immediate(object, LcEventSrc::Scanner).await;
|
||||
enqueue_transition_with_config(&lc, object, &LcEventSrc::Scanner).await;
|
||||
}
|
||||
|
||||
if !page.is_truncated {
|
||||
|
||||
@@ -776,9 +776,9 @@ impl LifecycleCalculate for Transition {
|
||||
return Some(date.into());
|
||||
}
|
||||
|
||||
match self.days {
|
||||
Some(days) => Some(expected_expiry_time(obj.mod_time.unwrap(), days)),
|
||||
None => obj.mod_time,
|
||||
match (self.days, obj.mod_time) {
|
||||
(Some(days), Some(mod_time)) => Some(expected_expiry_time(mod_time, days)),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1335,6 +1335,42 @@ mod tests {
|
||||
assert_eq!(event.storage_class, "WARM");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn eval_inner_transitions_without_schedule_are_ignored() {
|
||||
let base_time = OffsetDateTime::from_unix_timestamp(1_000_000).unwrap();
|
||||
let lc = BucketLifecycleConfiguration {
|
||||
expiry_updated_at: None,
|
||||
rules: vec![LifecycleRule {
|
||||
status: ExpirationStatus::from_static(ExpirationStatus::ENABLED),
|
||||
expiration: None,
|
||||
abort_incomplete_multipart_upload: None,
|
||||
del_marker_expiration: None,
|
||||
filter: None,
|
||||
id: Some("transition-no-schedule".to_string()),
|
||||
noncurrent_version_expiration: None,
|
||||
noncurrent_version_transitions: None,
|
||||
prefix: None,
|
||||
transitions: Some(vec![Transition {
|
||||
days: None,
|
||||
date: None,
|
||||
storage_class: Some(TransitionStorageClass::from_static("WARM")),
|
||||
}]),
|
||||
}],
|
||||
};
|
||||
|
||||
let opts = ObjectOpts {
|
||||
name: "obj".to_string(),
|
||||
mod_time: Some(base_time),
|
||||
is_latest: true,
|
||||
transition_status: "".to_string(),
|
||||
..Default::default()
|
||||
};
|
||||
let event = lc.eval_inner(&opts, base_time + Duration::days(1), 0).await;
|
||||
|
||||
assert_eq!(event.action, IlmAction::NoneAction);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
async fn eval_inner_expires_noncurrent_version_after_due() {
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
#![allow(clippy::map_entry)]
|
||||
|
||||
use crate::bucket::lifecycle::bucket_lifecycle_audit::LcEventSrc;
|
||||
use crate::bucket::lifecycle::bucket_lifecycle_ops::{enqueue_transition_immediate, init_background_expiry};
|
||||
use crate::bucket::lifecycle::bucket_lifecycle_ops::{init_background_expiry, try_enqueue_transition_immediate};
|
||||
use crate::bucket::metadata_sys::{self, set_bucket_metadata};
|
||||
use crate::bucket::utils::check_abort_multipart_args;
|
||||
use crate::bucket::utils::check_complete_multipart_args;
|
||||
@@ -135,7 +135,7 @@ async fn enqueue_transition_after_write(result: Result<ObjectInfo>, src: LcEvent
|
||||
if is_meta_bucketname(&object_info.bucket) {
|
||||
return Ok(object_info);
|
||||
}
|
||||
enqueue_transition_immediate(&object_info, src).await;
|
||||
try_enqueue_transition_immediate(&object_info, src).await;
|
||||
Ok(object_info)
|
||||
}
|
||||
|
||||
|
||||
@@ -128,21 +128,22 @@ fn validate_lifecycle_rule_status(rules: &[LifecycleRule]) -> Result<(), &'stati
|
||||
|
||||
fn lifecycle_has_transition_rules(config: &BucketLifecycleConfiguration) -> bool {
|
||||
config.rules.iter().any(|rule| {
|
||||
rule.transitions.as_ref().is_some_and(|transitions| {
|
||||
transitions.iter().any(|transition| {
|
||||
transition
|
||||
.storage_class
|
||||
.as_ref()
|
||||
.is_some_and(|storage_class| !storage_class.as_str().is_empty())
|
||||
})
|
||||
}) || rule.noncurrent_version_transitions.as_ref().is_some_and(|transitions| {
|
||||
transitions.iter().any(|transition| {
|
||||
transition
|
||||
.storage_class
|
||||
.as_ref()
|
||||
.is_some_and(|storage_class| !storage_class.as_str().is_empty())
|
||||
})
|
||||
})
|
||||
rule.status == ExpirationStatus::from_static(ExpirationStatus::ENABLED)
|
||||
&& (rule.transitions.as_ref().is_some_and(|transitions| {
|
||||
transitions.iter().any(|transition| {
|
||||
transition
|
||||
.storage_class
|
||||
.as_ref()
|
||||
.is_some_and(|storage_class| !storage_class.as_str().is_empty())
|
||||
})
|
||||
}) || rule.noncurrent_version_transitions.as_ref().is_some_and(|transitions| {
|
||||
transitions.iter().any(|transition| {
|
||||
transition
|
||||
.storage_class
|
||||
.as_ref()
|
||||
.is_some_and(|storage_class| !storage_class.as_str().is_empty())
|
||||
})
|
||||
}))
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1933,6 +1934,56 @@ mod tests {
|
||||
assert_eq!(validate_lifecycle_rule_status(&rules).unwrap_err(), ERR_LIFECYCLE_RULE_STATUS);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn lifecycle_has_transition_rules_ignores_disabled_rules() {
|
||||
let config = BucketLifecycleConfiguration {
|
||||
expiry_updated_at: None,
|
||||
rules: vec![LifecycleRule {
|
||||
status: ExpirationStatus::from_static(ExpirationStatus::DISABLED),
|
||||
transitions: Some(vec![Transition {
|
||||
storage_class: Some(TransitionStorageClass::from_static("WARM")),
|
||||
days: Some(30),
|
||||
date: None,
|
||||
}]),
|
||||
abort_incomplete_multipart_upload: None,
|
||||
del_marker_expiration: None,
|
||||
expiration: None,
|
||||
filter: None,
|
||||
id: Some("disabled-transition".to_string()),
|
||||
noncurrent_version_expiration: None,
|
||||
noncurrent_version_transitions: None,
|
||||
prefix: None,
|
||||
}],
|
||||
};
|
||||
|
||||
assert!(!lifecycle_has_transition_rules(&config));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn lifecycle_has_transition_rules_accepts_enabled_rules() {
|
||||
let config = BucketLifecycleConfiguration {
|
||||
expiry_updated_at: None,
|
||||
rules: vec![LifecycleRule {
|
||||
status: ExpirationStatus::from_static(ExpirationStatus::ENABLED),
|
||||
transitions: Some(vec![Transition {
|
||||
storage_class: Some(TransitionStorageClass::from_static("WARM")),
|
||||
days: Some(30),
|
||||
date: None,
|
||||
}]),
|
||||
abort_incomplete_multipart_upload: None,
|
||||
del_marker_expiration: None,
|
||||
expiration: None,
|
||||
filter: None,
|
||||
id: Some("enabled-transition".to_string()),
|
||||
noncurrent_version_expiration: None,
|
||||
noncurrent_version_transitions: None,
|
||||
prefix: None,
|
||||
}],
|
||||
};
|
||||
|
||||
assert!(lifecycle_has_transition_rules(&config));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn execute_list_buckets_returns_internal_error_when_store_uninitialized() {
|
||||
let input = ListBucketsInput::builder().build().unwrap();
|
||||
|
||||
Reference in New Issue
Block a user