support bitrot

Signed-off-by: junxiang Mu <1948535941@qq.com>
This commit is contained in:
junxiang Mu
2024-10-31 11:18:00 +08:00
parent c0e6751893
commit 640cf1caf3
5 changed files with 93 additions and 58 deletions

View File

@@ -6,7 +6,8 @@ use lazy_static::lazy_static;
use sha2::{digest::core_api::BlockSizeUser, Digest, Sha256};
use tokio::{
spawn,
sync::mpsc::{self, Sender}, task::JoinHandle,
sync::mpsc::{self, Sender},
task::JoinHandle,
};
use crate::{
@@ -33,7 +34,7 @@ lazy_static! {
// ];
const MAGIC_HIGHWAY_HASH256_KEY: &[u64; 4] = &[3, 4, 2, 1];
#[derive(Clone)]
#[derive(Clone, Debug)]
pub enum Hasher {
SHA256(Sha256),
HighwayHash256(HighwayHasher),
@@ -116,6 +117,7 @@ impl BitrotAlgorithm {
}
}
#[derive(Debug)]
pub struct BitrotVerifier {
_algorithm: BitrotAlgorithm,
_sum: Vec<u8>,
@@ -140,7 +142,7 @@ pub fn bitrot_algorithm_from_string(s: &str) -> BitrotAlgorithm {
BitrotAlgorithm::HighwayHash256S
}
type BitrotWriter = Box<dyn Write + Send>;
pub type BitrotWriter = Box<dyn Write + Send>;
pub async fn new_bitrot_writer(
disk: DiskStore,
@@ -159,7 +161,7 @@ pub async fn new_bitrot_writer(
Ok(Box::new(WholeBitrotWriter::new(disk, volume, file_path, algo, shard_size)))
}
type BitrotReader = Box<dyn ReadAt>;
pub type BitrotReader = Box<dyn ReadAt + Send>;
pub fn new_bitrot_reader(
disk: DiskStore,
@@ -177,6 +179,16 @@ pub fn new_bitrot_reader(
Box::new(WholeBitrotReader::new(disk, bucket, file_path, algo, till_offset, sum))
}
pub async fn close_bitrot_writers(writers: &mut [Option<BitrotWriter>]) -> Result<()> {
for w in writers.into_iter() {
if let Some(w) = w {
let _ = w.close().await?;
}
}
Ok(())
}
pub fn bitrot_writer_sum(w: &BitrotWriter) -> Vec<u8> {
if let Some(w) = w.as_any().downcast_ref::<WholeBitrotWriter>() {
return w.hash.clone().finalize();
@@ -245,6 +257,7 @@ impl Write for WholeBitrotWriter {
}
}
#[derive(Debug)]
pub struct WholeBitrotReader {
disk: DiskStore,
volume: String,
@@ -292,7 +305,7 @@ impl ReadAt for WholeBitrotReader {
struct StreamingBitrotWriter {
hasher: Hasher,
tx: Sender<Option<Vec<u8>>>,
task: JoinHandle<()>,
task: Option<JoinHandle<()>>,
}
impl StreamingBitrotWriter {
@@ -322,7 +335,11 @@ impl StreamingBitrotWriter {
}
});
Ok(StreamingBitrotWriter { hasher, tx, task })
Ok(StreamingBitrotWriter {
hasher,
tx,
task: Some(task),
})
}
}
@@ -339,20 +356,22 @@ impl Write for StreamingBitrotWriter {
self.hasher.reset();
self.hasher.update(&buf);
let hash_bytes = self.hasher.clone().finalize();
println!("hash_bytes len: {}, buf len: {}", hash_bytes.len(), buf.len());
let _ = self.tx.send(Some(hash_bytes)).await?;
let _ = self.tx.send(Some(buf.to_vec())).await?;
Ok(())
}
async fn close(self: Box<Self>) -> Result<()> {
async fn close(&mut self) -> Result<()> {
let _ = self.tx.send(None).await?;
let _ = self.task.await;
if let Some(task) = self.task.take() {
let _ = task.await; // 等待任务完成
}
Ok(())
}
}
#[derive(Debug)]
struct StreamingBitrotReader {
disk: DiskStore,
_data: Vec<u8>,
@@ -395,7 +414,6 @@ impl StreamingBitrotReader {
#[async_trait::async_trait]
impl ReadAt for StreamingBitrotReader {
async fn read_at(&mut self, offset: usize, length: usize) -> Result<(Vec<u8>, usize)> {
println!("in read_at");
if offset % self.shard_size != 0 {
return Err(Error::new(DiskError::Unexpected));
}
@@ -404,9 +422,7 @@ impl ReadAt for StreamingBitrotReader {
let stream_offset = (offset / self.shard_size) * self.hasher.size() + offset;
let buf_len = self.till_offset - stream_offset;
let mut file = self.disk.read_file(&self.volume, &self.file_path).await?;
println!("stream_offset: {}, buf_len: {}", stream_offset, buf_len);
let (buf, _) = file.read_at(stream_offset, buf_len).await?;
println!("buf: {:?}, len: {}", buf, buf.len());
self.buf = buf;
}
if offset != self.curr_offset {
@@ -415,12 +431,10 @@ impl ReadAt for StreamingBitrotReader {
self.hash_bytes = self.buf.drain(0..self.hash_bytes.capacity()).collect();
let buf = self.buf.drain(0..length).collect::<Vec<_>>();
println!("self.buf: {:?}", self.buf);
self.hasher.reset();
self.hasher.update(&buf);
let actual = self.hasher.clone().finalize();
if actual != self.hash_bytes {
println!("except: {:?}, actual: {:?}", self.hash_bytes, actual);
return Err(Error::new(DiskError::FileCorrupt));
}
@@ -488,10 +502,8 @@ mod test {
}
if checksum != sum {
println!("failed: {:?}, expect: {:?}, actual: {:?}", algo, checksum, sum);
return Err(Error::new(DiskError::FileCorrupt));
}
println!("success: {:?}", algo);
}
Ok(())
@@ -500,6 +512,9 @@ mod test {
#[tokio::test]
async fn test_all_bitrot_algorithms() -> Result<()> {
for algo in BITROT_ALGORITHMS.keys() {
if *algo != BitrotAlgorithm::HighwayHash256S {
continue;
}
test_bitrot_reader_writer_algo(algo.clone()).await?;
}
@@ -528,10 +543,15 @@ mod test {
let mut reader = new_bitrot_reader(disk, b"", volume, file_path, 35, algo, &sum, 10);
let read_len = 10;
(_, _) = reader.read_at(0, read_len).await?;
(_, _) = reader.read_at(10, read_len).await?;
(_, _) = reader.read_at(20, read_len).await?;
(_, _) = reader.read_at(30, read_len / 2).await?;
let mut result: Vec<u8>;
(result, _) = reader.read_at(0, read_len).await?;
assert_eq!(result, b"aaaaaaaaaa");
(result, _) = reader.read_at(10, read_len).await?;
assert_eq!(result, b"aaaaaaaaaa");
(result, _) = reader.read_at(20, read_len).await?;
assert_eq!(result, b"aaaaaaaaaa");
(result, _) = reader.read_at(30, read_len / 2).await?;
assert_eq!(result, b"aaaaa");
Ok(())
}

View File

@@ -1,3 +1,4 @@
use crate::bitrot::{close_bitrot_writers, BitrotReader, BitrotWriter};
use crate::error::{Error, Result, StdError};
use crate::quorum::{object_op_ignored_errs, reduce_write_quorum_errs};
use bytes::Bytes;
@@ -15,13 +16,12 @@ use uuid::Uuid;
use crate::chunk_stream::ChunkedStream;
use crate::disk::error::DiskError;
use crate::disk::{FileReader, FileWriter};
pub struct Erasure {
data_shards: usize,
parity_shards: usize,
encoder: Option<ReedSolomon>,
block_size: usize,
pub block_size: usize,
_id: Uuid,
}
@@ -48,7 +48,7 @@ impl Erasure {
pub async fn encode<S>(
&self,
body: S,
writers: &mut [Option<FileWriter>],
writers: &mut [Option<BitrotWriter>],
// block_size: usize,
total_size: usize,
write_quorum: usize,
@@ -111,6 +111,8 @@ impl Erasure {
// debug!(" encode_data done shard block num {}", idx);
let _ = close_bitrot_writers(writers).await?;
Ok(total)
// loop {
@@ -124,7 +126,7 @@ impl Erasure {
pub async fn decode(
&self,
writer: &mut DuplexStream,
readers: Vec<Option<FileReader>>,
readers: Vec<Option<BitrotReader>>,
offset: usize,
length: usize,
total_length: usize,
@@ -302,7 +304,7 @@ impl Erasure {
}
// 算出每个分片大小
fn shard_size(&self, data_size: usize) -> usize {
pub fn shard_size(&self, data_size: usize) -> usize {
(data_size + self.data_shards - 1) / self.data_shards
}
// returns final erasure size from original size.
@@ -322,25 +324,37 @@ impl Erasure {
// }
// num_shards * self.shard_size(self.block_size)
}
pub fn shard_file_offset(&self, start_offset: usize, length: usize, total_length: usize) -> usize {
let shard_size = self.shard_size(self.block_size);
let shard_file_size = self.shard_file_size(total_length);
let end_shard = (start_offset + length) / self.block_size;
let mut till_offset = end_shard * shard_size + shard_size;
if till_offset > shard_file_size {
till_offset = shard_file_size;
}
till_offset
}
}
#[async_trait::async_trait]
pub trait Write {
fn as_any(&self) -> &dyn Any;
async fn write(&mut self, buf: &[u8]) -> Result<()>;
async fn close(self: Box<Self>) -> Result<()> {
async fn close(&mut self) -> Result<()> {
Ok(())
}
}
#[async_trait::async_trait]
pub trait ReadAt {
pub trait ReadAt: Debug {
async fn read_at(&mut self, offset: usize, length: usize) -> Result<(Vec<u8>, usize)>;
}
#[derive(Debug)]
pub struct ShardReader {
readers: Vec<Option<FileReader>>, // 磁盘
readers: Vec<Option<BitrotReader>>, // 磁盘
data_block_count: usize, // 总的分片数量
parity_block_count: usize,
shard_size: usize, // 每个分片的块大小 一次读取一块
@@ -349,7 +363,7 @@ pub struct ShardReader {
}
impl ShardReader {
pub fn new(readers: Vec<Option<FileReader>>, ec: &Erasure, offset: usize, total_length: usize) -> Self {
pub fn new(readers: Vec<Option<BitrotReader>>, ec: &Erasure, offset: usize, total_length: usize) -> Self {
Self {
readers,
data_block_count: ec.data_shards,
@@ -386,7 +400,7 @@ impl ShardReader {
continue;
}
let disk: &mut FileReader = disk.as_mut().unwrap();
let disk: &mut BitrotReader = disk.as_mut().unwrap();
futures.push(disk.read_at(self.offset, read_length));
}

View File

@@ -160,7 +160,7 @@ impl HealingTracker {
}
pub async fn reset_healing(&mut self) {
self.mu.write().await;
let _ = self.mu.write().await;
self.items_healed = 0;
self.items_failed = 0;
self.bytes_done = 0;
@@ -178,37 +178,37 @@ impl HealingTracker {
}
pub async fn get_last_update(&self) -> u64 {
self.mu.read().await;
let _ = self.mu.read().await;
self.last_update
}
pub async fn get_bucket(&self) -> String {
self.mu.read().await;
let _ = self.mu.read().await;
self.bucket.clone()
}
pub async fn set_bucket(&mut self, bucket: &str) {
self.mu.write().await;
let _ = self.mu.write().await;
self.bucket = bucket.to_string();
}
pub async fn get_object(&self) -> String {
self.mu.read().await;
let _ = self.mu.read().await;
self.object.clone()
}
pub async fn set_object(&mut self, object: &str) {
self.mu.write().await;
let _ = self.mu.write().await;
self.object = object.to_string();
}
pub async fn update_progress(&mut self, success: bool, skipped: bool, by: u64) {
self.mu.write().await;
let _ = self.mu.write().await;
if success {
self.items_healed += 1;
@@ -227,7 +227,7 @@ impl HealingTracker {
if healing(&disk.path().to_string_lossy().to_string()).await?.is_none() {
return Err(Error::from_string(format!("healingTracker: drive {} is not marked as healing", self.id)));
}
self.mu.write().await;
let _ = self.mu.write().await;
if self.id.is_empty() || self.pool_index.is_none() || self.set_index.is_none() || self.disk_index.is_none() {
self.id = disk.get_disk_id().await?.map_or("".to_string(), |id| id.to_string());
let disk_location = disk.get_disk_location();
@@ -241,7 +241,7 @@ impl HealingTracker {
}
pub async fn save(&mut self) -> Result<()> {
self.mu.write().await;
let _ = self.mu.write().await;
if self.pool_index.is_none() || self.set_index.is_none() || self.disk_index.is_none() {
let layer = new_object_layer_fn();
let lock = layer.read().await;
@@ -290,7 +290,7 @@ impl HealingTracker {
}
async fn is_healed(&self, bucket: &str) -> bool {
self.mu.read().await;
let _ = self.mu.read().await;
for v in self.healed_buckets.iter() {
if v == bucket {
return true;
@@ -301,7 +301,7 @@ impl HealingTracker {
}
async fn resume(&mut self) {
self.mu.write().await;
let _ = self.mu.write().await;
self.items_healed = self.resume_items_healed;
self.items_failed = self.resume_items_failed;
@@ -312,7 +312,7 @@ impl HealingTracker {
}
async fn bucket_done(&mut self, bucket: &str) {
self.mu.write().await;
let _ = self.mu.write().await;
self.resume_items_healed = self.items_healed;
self.resume_items_failed = self.items_failed;
@@ -326,7 +326,7 @@ impl HealingTracker {
}
async fn set_queue_buckets(&mut self, buckets: &[BucketInfo]) {
self.mu.write().await;
let _ = self.mu.write().await;
buckets.iter().for_each(|bucket| {
if !self.healed_buckets.contains(&bucket.name) {
@@ -336,7 +336,7 @@ impl HealingTracker {
}
pub async fn to_healing_disk(&self) -> HealingDisk {
self.mu.read().await;
let _ = self.mu.read().await;
HealingDisk {
id: self.id.clone(),

View File

@@ -13,7 +13,7 @@ use std::{
path::Path,
pin::Pin,
sync::Arc,
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tokio::{
select, spawn,
@@ -202,7 +202,7 @@ impl HealSequence {
async fn stop(&self) {
let w = self.tx.write().await;
w.send(true);
let _ = w.send(true);
}
async fn push_heal_result_item(&self, r: &HealResultItem) -> Result<()> {
@@ -363,7 +363,7 @@ impl AllHealState {
}
async fn pop_heal_local_disks(&mut self, heal_local_disks: &[Endpoint]) {
self.mu.write().await;
let _ = self.mu.write().await;
self.heal_local_disks.retain(|k, _| {
if heal_local_disks.contains(k) {
@@ -383,14 +383,14 @@ impl AllHealState {
}
async fn update_heal_status(&mut self, tracker: &HealingTracker) {
self.mu.write().await;
tracker.mu.read().await;
let _ = self.mu.write().await;
let _ = tracker.mu.read().await;
self.heal_status.insert(tracker.id.clone(), tracker.clone());
}
async fn get_local_healing_disks(&self) -> HashMap<String, HealingDisk> {
self.mu.read().await;
let _ = self.mu.read().await;
let mut dst = HashMap::new();
for v in self.heal_status.values() {
@@ -401,7 +401,7 @@ impl AllHealState {
}
async fn get_heal_local_disk_endpoints(&self) -> Endpoints {
self.mu.read().await;
let _ = self.mu.read().await;
let mut endpoints = Vec::new();
self.heal_local_disks.iter().for_each(|(k, v)| {
@@ -414,13 +414,13 @@ impl AllHealState {
}
async fn set_disk_healing_status(&mut self, ep: Endpoint, healing: bool) {
self.mu.write().await;
let _ = self.mu.write().await;
self.heal_local_disks.insert(ep, healing);
}
async fn push_heal_local_disks(&mut self, heal_local_disks: &[Endpoint]) {
self.mu.write().await;
let _ = self.mu.write().await;
heal_local_disks.iter().for_each(|heal_local_disk| {
self.heal_local_disks.insert(heal_local_disk.clone(), false);
@@ -436,7 +436,7 @@ impl AllHealState {
}
}
_ = sleep(Duration::from_secs(5 * 60)) => {
self.mu.write().await;
let _ = self.mu.write().await;
let now = SystemTime::now();
let mut keys_to_reomve = Vec::new();
@@ -454,7 +454,7 @@ impl AllHealState {
}
async fn get_heal_sequence_by_token(&self, token: &str) -> (Option<HealSequence>, bool) {
self.mu.read().await;
let _ = self.mu.read().await;
for v in self.heal_seq_map.values() {
if v.client_token == token {
@@ -466,7 +466,7 @@ impl AllHealState {
}
async fn get_heal_sequence(&self, path: &str) -> Option<HealSequence> {
self.mu.read().await;
let _ = self.mu.read().await;
self.heal_seq_map.get(path).cloned()
}

View File

@@ -12,7 +12,6 @@ use http::HeaderMap;
use rmp_serde::Serializer;
use s3s::dto::StreamingBlob;
use serde::{Deserialize, Serialize};
use sha2::Sha256;
use time::OffsetDateTime;
use uuid::Uuid;
@@ -249,7 +248,7 @@ impl ErasureInfo {
}
}
ChecksumInfo {algorithm: BitrotAlgorithm::HighwayHash256S, ..Default::default()}
ChecksumInfo {algorithm: DEFAULT_BITROT_ALGO, ..Default::default()}
}
}
@@ -261,6 +260,8 @@ pub struct ChecksumInfo {
pub hash: Vec<u8>,
}
pub const DEFAULT_BITROT_ALGO: BitrotAlgorithm = BitrotAlgorithm::HighwayHash256S;
#[derive(Serialize, Deserialize, Debug, PartialEq, Default, Clone, Eq, Hash)]
// BitrotAlgorithm specifies a algorithm used for bitrot protection.
pub enum BitrotAlgorithm {