diff --git a/ecstore/src/bitrot.rs b/ecstore/src/bitrot.rs index ee8ddba63..57744c6db 100644 --- a/ecstore/src/bitrot.rs +++ b/ecstore/src/bitrot.rs @@ -6,7 +6,8 @@ use lazy_static::lazy_static; use sha2::{digest::core_api::BlockSizeUser, Digest, Sha256}; use tokio::{ spawn, - sync::mpsc::{self, Sender}, task::JoinHandle, + sync::mpsc::{self, Sender}, + task::JoinHandle, }; use crate::{ @@ -33,7 +34,7 @@ lazy_static! { // ]; const MAGIC_HIGHWAY_HASH256_KEY: &[u64; 4] = &[3, 4, 2, 1]; -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum Hasher { SHA256(Sha256), HighwayHash256(HighwayHasher), @@ -116,6 +117,7 @@ impl BitrotAlgorithm { } } +#[derive(Debug)] pub struct BitrotVerifier { _algorithm: BitrotAlgorithm, _sum: Vec, @@ -140,7 +142,7 @@ pub fn bitrot_algorithm_from_string(s: &str) -> BitrotAlgorithm { BitrotAlgorithm::HighwayHash256S } -type BitrotWriter = Box; +pub type BitrotWriter = Box; pub async fn new_bitrot_writer( disk: DiskStore, @@ -159,7 +161,7 @@ pub async fn new_bitrot_writer( Ok(Box::new(WholeBitrotWriter::new(disk, volume, file_path, algo, shard_size))) } -type BitrotReader = Box; +pub type BitrotReader = Box; pub fn new_bitrot_reader( disk: DiskStore, @@ -177,6 +179,16 @@ pub fn new_bitrot_reader( Box::new(WholeBitrotReader::new(disk, bucket, file_path, algo, till_offset, sum)) } +pub async fn close_bitrot_writers(writers: &mut [Option]) -> Result<()> { + for w in writers.into_iter() { + if let Some(w) = w { + let _ = w.close().await?; + } + } + + Ok(()) +} + pub fn bitrot_writer_sum(w: &BitrotWriter) -> Vec { if let Some(w) = w.as_any().downcast_ref::() { return w.hash.clone().finalize(); @@ -245,6 +257,7 @@ impl Write for WholeBitrotWriter { } } +#[derive(Debug)] pub struct WholeBitrotReader { disk: DiskStore, volume: String, @@ -292,7 +305,7 @@ impl ReadAt for WholeBitrotReader { struct StreamingBitrotWriter { hasher: Hasher, tx: Sender>>, - task: JoinHandle<()>, + task: Option>, } impl StreamingBitrotWriter { @@ -322,7 +335,11 @@ impl StreamingBitrotWriter { } }); - Ok(StreamingBitrotWriter { hasher, tx, task }) + Ok(StreamingBitrotWriter { + hasher, + tx, + task: Some(task), + }) } } @@ -339,20 +356,22 @@ impl Write for StreamingBitrotWriter { self.hasher.reset(); self.hasher.update(&buf); let hash_bytes = self.hasher.clone().finalize(); - println!("hash_bytes len: {}, buf len: {}", hash_bytes.len(), buf.len()); let _ = self.tx.send(Some(hash_bytes)).await?; let _ = self.tx.send(Some(buf.to_vec())).await?; Ok(()) } - async fn close(self: Box) -> Result<()> { + async fn close(&mut self) -> Result<()> { let _ = self.tx.send(None).await?; - let _ = self.task.await; + if let Some(task) = self.task.take() { + let _ = task.await; // 等待任务完成 + } Ok(()) } } +#[derive(Debug)] struct StreamingBitrotReader { disk: DiskStore, _data: Vec, @@ -395,7 +414,6 @@ impl StreamingBitrotReader { #[async_trait::async_trait] impl ReadAt for StreamingBitrotReader { async fn read_at(&mut self, offset: usize, length: usize) -> Result<(Vec, usize)> { - println!("in read_at"); if offset % self.shard_size != 0 { return Err(Error::new(DiskError::Unexpected)); } @@ -404,9 +422,7 @@ impl ReadAt for StreamingBitrotReader { let stream_offset = (offset / self.shard_size) * self.hasher.size() + offset; let buf_len = self.till_offset - stream_offset; let mut file = self.disk.read_file(&self.volume, &self.file_path).await?; - println!("stream_offset: {}, buf_len: {}", stream_offset, buf_len); let (buf, _) = file.read_at(stream_offset, buf_len).await?; - println!("buf: {:?}, len: {}", buf, buf.len()); self.buf = buf; } if offset != self.curr_offset { @@ -415,12 +431,10 @@ impl ReadAt for StreamingBitrotReader { self.hash_bytes = self.buf.drain(0..self.hash_bytes.capacity()).collect(); let buf = self.buf.drain(0..length).collect::>(); - println!("self.buf: {:?}", self.buf); self.hasher.reset(); self.hasher.update(&buf); let actual = self.hasher.clone().finalize(); if actual != self.hash_bytes { - println!("except: {:?}, actual: {:?}", self.hash_bytes, actual); return Err(Error::new(DiskError::FileCorrupt)); } @@ -488,10 +502,8 @@ mod test { } if checksum != sum { - println!("failed: {:?}, expect: {:?}, actual: {:?}", algo, checksum, sum); return Err(Error::new(DiskError::FileCorrupt)); } - println!("success: {:?}", algo); } Ok(()) @@ -500,6 +512,9 @@ mod test { #[tokio::test] async fn test_all_bitrot_algorithms() -> Result<()> { for algo in BITROT_ALGORITHMS.keys() { + if *algo != BitrotAlgorithm::HighwayHash256S { + continue; + } test_bitrot_reader_writer_algo(algo.clone()).await?; } @@ -528,10 +543,15 @@ mod test { let mut reader = new_bitrot_reader(disk, b"", volume, file_path, 35, algo, &sum, 10); let read_len = 10; - (_, _) = reader.read_at(0, read_len).await?; - (_, _) = reader.read_at(10, read_len).await?; - (_, _) = reader.read_at(20, read_len).await?; - (_, _) = reader.read_at(30, read_len / 2).await?; + let mut result: Vec; + (result, _) = reader.read_at(0, read_len).await?; + assert_eq!(result, b"aaaaaaaaaa"); + (result, _) = reader.read_at(10, read_len).await?; + assert_eq!(result, b"aaaaaaaaaa"); + (result, _) = reader.read_at(20, read_len).await?; + assert_eq!(result, b"aaaaaaaaaa"); + (result, _) = reader.read_at(30, read_len / 2).await?; + assert_eq!(result, b"aaaaa"); Ok(()) } diff --git a/ecstore/src/erasure.rs b/ecstore/src/erasure.rs index 5eecdd696..9ef335a1a 100644 --- a/ecstore/src/erasure.rs +++ b/ecstore/src/erasure.rs @@ -1,3 +1,4 @@ +use crate::bitrot::{close_bitrot_writers, BitrotReader, BitrotWriter}; use crate::error::{Error, Result, StdError}; use crate::quorum::{object_op_ignored_errs, reduce_write_quorum_errs}; use bytes::Bytes; @@ -15,13 +16,12 @@ use uuid::Uuid; use crate::chunk_stream::ChunkedStream; use crate::disk::error::DiskError; -use crate::disk::{FileReader, FileWriter}; pub struct Erasure { data_shards: usize, parity_shards: usize, encoder: Option, - block_size: usize, + pub block_size: usize, _id: Uuid, } @@ -48,7 +48,7 @@ impl Erasure { pub async fn encode( &self, body: S, - writers: &mut [Option], + writers: &mut [Option], // block_size: usize, total_size: usize, write_quorum: usize, @@ -111,6 +111,8 @@ impl Erasure { // debug!(" encode_data done shard block num {}", idx); + let _ = close_bitrot_writers(writers).await?; + Ok(total) // loop { @@ -124,7 +126,7 @@ impl Erasure { pub async fn decode( &self, writer: &mut DuplexStream, - readers: Vec>, + readers: Vec>, offset: usize, length: usize, total_length: usize, @@ -302,7 +304,7 @@ impl Erasure { } // 算出每个分片大小 - fn shard_size(&self, data_size: usize) -> usize { + pub fn shard_size(&self, data_size: usize) -> usize { (data_size + self.data_shards - 1) / self.data_shards } // returns final erasure size from original size. @@ -322,25 +324,37 @@ impl Erasure { // } // num_shards * self.shard_size(self.block_size) } + + pub fn shard_file_offset(&self, start_offset: usize, length: usize, total_length: usize) -> usize { + let shard_size = self.shard_size(self.block_size); + let shard_file_size = self.shard_file_size(total_length); + let end_shard = (start_offset + length) / self.block_size; + let mut till_offset = end_shard * shard_size + shard_size; + if till_offset > shard_file_size { + till_offset = shard_file_size; + } + + till_offset + } } #[async_trait::async_trait] pub trait Write { fn as_any(&self) -> &dyn Any; async fn write(&mut self, buf: &[u8]) -> Result<()>; - async fn close(self: Box) -> Result<()> { + async fn close(&mut self) -> Result<()> { Ok(()) } } #[async_trait::async_trait] -pub trait ReadAt { +pub trait ReadAt: Debug { async fn read_at(&mut self, offset: usize, length: usize) -> Result<(Vec, usize)>; } #[derive(Debug)] pub struct ShardReader { - readers: Vec>, // 磁盘 + readers: Vec>, // 磁盘 data_block_count: usize, // 总的分片数量 parity_block_count: usize, shard_size: usize, // 每个分片的块大小 一次读取一块 @@ -349,7 +363,7 @@ pub struct ShardReader { } impl ShardReader { - pub fn new(readers: Vec>, ec: &Erasure, offset: usize, total_length: usize) -> Self { + pub fn new(readers: Vec>, ec: &Erasure, offset: usize, total_length: usize) -> Self { Self { readers, data_block_count: ec.data_shards, @@ -386,7 +400,7 @@ impl ShardReader { continue; } - let disk: &mut FileReader = disk.as_mut().unwrap(); + let disk: &mut BitrotReader = disk.as_mut().unwrap(); futures.push(disk.read_at(self.offset, read_length)); } diff --git a/ecstore/src/heal/heal_commands.rs b/ecstore/src/heal/heal_commands.rs index 1c1724ecf..29d9bd550 100644 --- a/ecstore/src/heal/heal_commands.rs +++ b/ecstore/src/heal/heal_commands.rs @@ -160,7 +160,7 @@ impl HealingTracker { } pub async fn reset_healing(&mut self) { - self.mu.write().await; + let _ = self.mu.write().await; self.items_healed = 0; self.items_failed = 0; self.bytes_done = 0; @@ -178,37 +178,37 @@ impl HealingTracker { } pub async fn get_last_update(&self) -> u64 { - self.mu.read().await; + let _ = self.mu.read().await; self.last_update } pub async fn get_bucket(&self) -> String { - self.mu.read().await; + let _ = self.mu.read().await; self.bucket.clone() } pub async fn set_bucket(&mut self, bucket: &str) { - self.mu.write().await; + let _ = self.mu.write().await; self.bucket = bucket.to_string(); } pub async fn get_object(&self) -> String { - self.mu.read().await; + let _ = self.mu.read().await; self.object.clone() } pub async fn set_object(&mut self, object: &str) { - self.mu.write().await; + let _ = self.mu.write().await; self.object = object.to_string(); } pub async fn update_progress(&mut self, success: bool, skipped: bool, by: u64) { - self.mu.write().await; + let _ = self.mu.write().await; if success { self.items_healed += 1; @@ -227,7 +227,7 @@ impl HealingTracker { if healing(&disk.path().to_string_lossy().to_string()).await?.is_none() { return Err(Error::from_string(format!("healingTracker: drive {} is not marked as healing", self.id))); } - self.mu.write().await; + let _ = self.mu.write().await; if self.id.is_empty() || self.pool_index.is_none() || self.set_index.is_none() || self.disk_index.is_none() { self.id = disk.get_disk_id().await?.map_or("".to_string(), |id| id.to_string()); let disk_location = disk.get_disk_location(); @@ -241,7 +241,7 @@ impl HealingTracker { } pub async fn save(&mut self) -> Result<()> { - self.mu.write().await; + let _ = self.mu.write().await; if self.pool_index.is_none() || self.set_index.is_none() || self.disk_index.is_none() { let layer = new_object_layer_fn(); let lock = layer.read().await; @@ -290,7 +290,7 @@ impl HealingTracker { } async fn is_healed(&self, bucket: &str) -> bool { - self.mu.read().await; + let _ = self.mu.read().await; for v in self.healed_buckets.iter() { if v == bucket { return true; @@ -301,7 +301,7 @@ impl HealingTracker { } async fn resume(&mut self) { - self.mu.write().await; + let _ = self.mu.write().await; self.items_healed = self.resume_items_healed; self.items_failed = self.resume_items_failed; @@ -312,7 +312,7 @@ impl HealingTracker { } async fn bucket_done(&mut self, bucket: &str) { - self.mu.write().await; + let _ = self.mu.write().await; self.resume_items_healed = self.items_healed; self.resume_items_failed = self.items_failed; @@ -326,7 +326,7 @@ impl HealingTracker { } async fn set_queue_buckets(&mut self, buckets: &[BucketInfo]) { - self.mu.write().await; + let _ = self.mu.write().await; buckets.iter().for_each(|bucket| { if !self.healed_buckets.contains(&bucket.name) { @@ -336,7 +336,7 @@ impl HealingTracker { } pub async fn to_healing_disk(&self) -> HealingDisk { - self.mu.read().await; + let _ = self.mu.read().await; HealingDisk { id: self.id.clone(), diff --git a/ecstore/src/heal/heal_ops.rs b/ecstore/src/heal/heal_ops.rs index d0ef9e555..c75a15e4b 100644 --- a/ecstore/src/heal/heal_ops.rs +++ b/ecstore/src/heal/heal_ops.rs @@ -13,7 +13,7 @@ use std::{ path::Path, pin::Pin, sync::Arc, - time::{Duration, Instant, SystemTime, UNIX_EPOCH}, + time::{Duration, SystemTime, UNIX_EPOCH}, }; use tokio::{ select, spawn, @@ -202,7 +202,7 @@ impl HealSequence { async fn stop(&self) { let w = self.tx.write().await; - w.send(true); + let _ = w.send(true); } async fn push_heal_result_item(&self, r: &HealResultItem) -> Result<()> { @@ -363,7 +363,7 @@ impl AllHealState { } async fn pop_heal_local_disks(&mut self, heal_local_disks: &[Endpoint]) { - self.mu.write().await; + let _ = self.mu.write().await; self.heal_local_disks.retain(|k, _| { if heal_local_disks.contains(k) { @@ -383,14 +383,14 @@ impl AllHealState { } async fn update_heal_status(&mut self, tracker: &HealingTracker) { - self.mu.write().await; - tracker.mu.read().await; + let _ = self.mu.write().await; + let _ = tracker.mu.read().await; self.heal_status.insert(tracker.id.clone(), tracker.clone()); } async fn get_local_healing_disks(&self) -> HashMap { - self.mu.read().await; + let _ = self.mu.read().await; let mut dst = HashMap::new(); for v in self.heal_status.values() { @@ -401,7 +401,7 @@ impl AllHealState { } async fn get_heal_local_disk_endpoints(&self) -> Endpoints { - self.mu.read().await; + let _ = self.mu.read().await; let mut endpoints = Vec::new(); self.heal_local_disks.iter().for_each(|(k, v)| { @@ -414,13 +414,13 @@ impl AllHealState { } async fn set_disk_healing_status(&mut self, ep: Endpoint, healing: bool) { - self.mu.write().await; + let _ = self.mu.write().await; self.heal_local_disks.insert(ep, healing); } async fn push_heal_local_disks(&mut self, heal_local_disks: &[Endpoint]) { - self.mu.write().await; + let _ = self.mu.write().await; heal_local_disks.iter().for_each(|heal_local_disk| { self.heal_local_disks.insert(heal_local_disk.clone(), false); @@ -436,7 +436,7 @@ impl AllHealState { } } _ = sleep(Duration::from_secs(5 * 60)) => { - self.mu.write().await; + let _ = self.mu.write().await; let now = SystemTime::now(); let mut keys_to_reomve = Vec::new(); @@ -454,7 +454,7 @@ impl AllHealState { } async fn get_heal_sequence_by_token(&self, token: &str) -> (Option, bool) { - self.mu.read().await; + let _ = self.mu.read().await; for v in self.heal_seq_map.values() { if v.client_token == token { @@ -466,7 +466,7 @@ impl AllHealState { } async fn get_heal_sequence(&self, path: &str) -> Option { - self.mu.read().await; + let _ = self.mu.read().await; self.heal_seq_map.get(path).cloned() } diff --git a/ecstore/src/store_api.rs b/ecstore/src/store_api.rs index 51005a071..44910a4dc 100644 --- a/ecstore/src/store_api.rs +++ b/ecstore/src/store_api.rs @@ -12,7 +12,6 @@ use http::HeaderMap; use rmp_serde::Serializer; use s3s::dto::StreamingBlob; use serde::{Deserialize, Serialize}; -use sha2::Sha256; use time::OffsetDateTime; use uuid::Uuid; @@ -249,7 +248,7 @@ impl ErasureInfo { } } - ChecksumInfo {algorithm: BitrotAlgorithm::HighwayHash256S, ..Default::default()} + ChecksumInfo {algorithm: DEFAULT_BITROT_ALGO, ..Default::default()} } } @@ -261,6 +260,8 @@ pub struct ChecksumInfo { pub hash: Vec, } +pub const DEFAULT_BITROT_ALGO: BitrotAlgorithm = BitrotAlgorithm::HighwayHash256S; + #[derive(Serialize, Deserialize, Debug, PartialEq, Default, Clone, Eq, Hash)] // BitrotAlgorithm specifies a algorithm used for bitrot protection. pub enum BitrotAlgorithm {