fs: internal flush fix (#21)

This commit is contained in:
bitful-pannul 2023-10-13 16:06:05 +03:00 committed by GitHub
parent 29cf3fb53c
commit 6c1a672cee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 56 additions and 43 deletions

View File

@ -335,6 +335,7 @@ impl Manifest {
{
if let ChunkLocation::Memory(offset) = location {
*location = ChunkLocation::WAL(wal_length_before_flush + *offset);
in_memory_file.wal_chunks.push(start);
}
}
}
@ -347,6 +348,7 @@ impl Manifest {
}
}
}
in_memory_file.mem_chunks.clear();
}
memory_buffer.clear();
@ -358,7 +360,6 @@ impl Manifest {
// called from main, locks manifest
// other flush_to_wal gets buffer and others passed in.
// potentially unify with options.
let mut manifest = self.manifest.write().await;
let mut memory_buffer = self.memory_buffer.write().await;
let mut wal_file = self.wal_file.write().await;
@ -374,6 +375,7 @@ impl Manifest {
{
if let ChunkLocation::Memory(offset) = location {
*location = ChunkLocation::WAL(wal_length_before_flush + *offset);
in_memory_file.wal_chunks.push(start);
}
}
}
@ -386,10 +388,10 @@ impl Manifest {
}
}
}
in_memory_file.mem_chunks.clear();
}
memory_buffer.clear();
Ok(())
}
@ -948,8 +950,13 @@ impl Manifest {
let mut hash_index = self.hash_index.write().await;
let mut memory_buffer = self.memory_buffer.write().await;
let mut to_flush: Vec<(
FileIdentifier,
Vec<([u8; 32], u64, u64, ChunkLocation, bool)>,
)> = Vec::new();
for (file_id, in_memory_file) in manifest_lock.iter_mut() {
let mut chunks_to_flush: Vec<([u8; 32], u64, u64, ChunkLocation, bool)> = Vec::new();
for &start in &in_memory_file.mem_chunks {
if let Some((hash, length, location, encrypted)) = in_memory_file.chunks.get(&start)
{
@ -979,8 +986,14 @@ impl Manifest {
}
}
}
if !chunks_to_flush.is_empty() {
to_flush.push((file_id.clone(), chunks_to_flush));
}
}
for (hash, start, length, location, encrypted) in chunks_to_flush.iter() {
for (file_id, chunks) in to_flush.iter() {
let in_memory_file = manifest_lock.get_mut(file_id).unwrap();
for (hash, start, length, location, encrypted) in chunks.iter() {
let total_len = if *encrypted {
length + ENCRYPTION_OVERHEAD as u64
} else {
@ -1027,7 +1040,7 @@ impl Manifest {
client.put_object(req).await?;
} else {
let path = self.fs_directory_path.join(hex::encode(hash));
fs::write(path, &buffer).await?;
fs::write(path, buffer).await?;
}
// add a manifest entry with the new hash and removed wal_position
in_memory_file.chunks.insert(
@ -1039,56 +1052,54 @@ impl Manifest {
*encrypted,
),
);
chunk_hashes.insert(*hash, !self.cloud_enabled);
}
in_memory_file.mem_chunks.clear();
in_memory_file.wal_chunks.clear();
if !chunks_to_flush.is_empty() {
// add updated manifest entries to the manifest file
let entry = ManifestRecord::Backup(BackupEntry {
file: file_id.clone(),
chunks: in_memory_file
.chunks
.iter()
.map(|(&k, v)| {
let local = match v.2 {
ChunkLocation::ColdStorage(local) => local,
_ => true, // WAL is always local
};
(v.0, k, v.1, v.3, local)
})
.collect::<Vec<_>>(),
});
let serialized_entry = bincode::serialize(&entry).unwrap();
let entry_length = serialized_entry.len() as u64;
// chunks have been flushed, let's add a manifest entry.
let entry = ManifestRecord::Backup(BackupEntry {
file: file_id.clone(),
chunks: in_memory_file
.chunks
.iter()
.map(|(&k, v)| {
let local = match v.2 {
ChunkLocation::ColdStorage(local) => local,
_ => true, // WAL is always local
};
(v.0, k, v.1, v.3, local)
})
.collect::<Vec<_>>(),
});
let serialized_entry = bincode::serialize(&entry).unwrap();
let entry_length = serialized_entry.len() as u64;
let mut buffer = Vec::new();
buffer.extend_from_slice(&entry_length.to_le_bytes());
buffer.extend_from_slice(&serialized_entry);
manifest_file.write_all(&buffer).await?;
if self.cloud_enabled {
let (client, bucket) = self.s3_client.as_ref().unwrap();
let mut buffer = Vec::new();
buffer.extend_from_slice(&entry_length.to_le_bytes());
buffer.extend_from_slice(&serialized_entry);
manifest_file.seek(SeekFrom::Start(0)).await?;
manifest_file.read_to_end(&mut buffer).await?;
manifest_file.write_all(&buffer).await?;
if self.cloud_enabled {
let (client, bucket) = self.s3_client.as_ref().unwrap();
let mut buffer = Vec::new();
manifest_file.seek(SeekFrom::Start(0)).await?;
manifest_file.read_to_end(&mut buffer).await?;
let req = PutObjectRequest {
bucket: bucket.clone(),
key: "manifest.bin".to_string(),
body: Some(StreamingBody::from(buffer)),
..Default::default()
};
client.put_object(req).await?;
}
hash_index.insert(in_memory_file.hash(), file_id.clone());
let req = PutObjectRequest {
bucket: bucket.clone(),
key: "manifest.bin".to_string(),
body: Some(StreamingBody::from(buffer)),
..Default::default()
};
client.put_object(req).await?;
}
hash_index.insert(in_memory_file.hash(), file_id.clone());
}
// clear the WAL file and memory buffer
wal_file.set_len(0).await?;
memory_buffer.clear();
Ok(())
}

View File

@ -571,6 +571,7 @@ async fn handle_request(
action: "Write".into(),
});
};
// println!("fs: got write from {:?} with len {:?}", source.process, &payload.bytes.len());
let file_uuid = FileIdentifier::new_uuid();
match manifest.write(&file_uuid, &payload.bytes).await {
@ -712,6 +713,7 @@ async fn handle_request(
});
};
// println!("setting state for process {:?} with len {:?}", process_id, &payload.bytes.len());
let file = FileIdentifier::Process(process_id);
match manifest.write(&file, &payload.bytes).await {
Ok(_) => (),