mononoke: compress file history

Summary:
This diff adds handling of "overflows" i.e. making sure that each FastlogBatch
doesn't store more than 60 entries.
The idea is that FastlogBatch has "latest" entries and "previous_batches". New
entry is prepended to "latest", but if "latest" has more than 10 entries then
new FastlogBatch is uploaded and prepended to "previous_batches". If
"previous_batches" has more than 5 entries than the oldest entries is removed
from the batch.

Note that merges are still not handled in this diff, this functionality will be added in the next
diffs

Reviewed By: krallin

Differential Revision: D17160506

fbshipit-source-id: 956bc5320667c6c5511d2a567740a4f6ebd8ac1b
This commit is contained in:
Stanislau Hlebik 2019-09-03 07:28:47 -07:00 committed by Facebook Github Bot
parent 0a317d9ea2
commit f50e19a70e
3 changed files with 227 additions and 48 deletions

View File

@ -38,9 +38,13 @@ pub(crate) fn create_new_batch(
if parent_batches.len() < 2 {
match parent_batches.get(0) {
Some(parent_batch) => parent_batch
.prepend_single_parent(ctx, blobstore, linknode)
.prepend_child_with_single_parent(ctx, blobstore, linknode)
.left_future(),
None => future::ok(FastlogBatch::new(linknode)).right_future(),
None => {
let mut d = VecDeque::new();
d.push_back((linknode, vec![]));
future::ok(FastlogBatch::new(d)).right_future()
}
}
} else {
// TODO(stash): handle merges as well
@ -95,8 +99,11 @@ fn generate_fastlog_batch_key(unode_entry: Entry<ManifestUnodeId, FileUnodeId>)
format!("fastlogbatch.{}", key_part)
}
const MAX_LATEST_LEN: usize = 10;
const MAX_BATCHES: usize = 5;
#[derive(Clone)]
struct ParentOffset(i32);
pub(crate) struct ParentOffset(i32);
#[derive(Clone)]
pub struct FastlogBatch {
@ -105,55 +112,102 @@ pub struct FastlogBatch {
}
impl FastlogBatch {
pub(crate) fn new(cs_id: ChangesetId) -> Self {
let mut latest = VecDeque::new();
latest.push_front((cs_id, vec![]));
FastlogBatch {
fn new(latest: VecDeque<(ChangesetId, Vec<ParentOffset>)>) -> Self {
Self {
latest,
previous_batches: VecDeque::new(),
}
}
pub(crate) fn prepend_single_parent(
// Prepending a child with a single parent is a special case - we only need to prepend one entry
// with ParentOffset(1).
pub(crate) fn prepend_child_with_single_parent(
&self,
_ctx: CoreContext,
_blobstore: Arc<dyn Blobstore>,
ctx: CoreContext,
blobstore: Arc<dyn Blobstore>,
cs_id: ChangesetId,
) -> impl Future<Item = FastlogBatch, Error = Error> {
let new_entry = (cs_id, vec![ParentOffset(1)]);
let mut new_batch = self.clone();
// If there's just one parent, then the latest commit in the parent batch is the parent,
// and offset to this parent is 1.
new_batch.latest.push_front((cs_id, vec![ParentOffset(1)]));
// TODO(stash): handle overflows
future::ok(new_batch)
if new_batch.latest.len() >= MAX_LATEST_LEN {
let previous_latest = std::mem::replace(&mut new_batch.latest, VecDeque::new());
let new_previous_batch = FastlogBatch::new(previous_latest);
new_previous_batch
.store(ctx.clone(), &blobstore)
.map(move |new_batch_id| {
if new_batch.previous_batches.len() >= MAX_BATCHES {
new_batch.previous_batches.pop_back();
}
new_batch.latest.push_front(new_entry);
new_batch.previous_batches.push_front(new_batch_id);
new_batch
})
.left_future()
} else {
new_batch.latest.push_front(new_entry);
future::ok(new_batch).right_future()
}
}
pub(crate) fn convert_to_list(
pub(crate) fn fetch_flattened(
&self,
_ctx: CoreContext,
_blobstore: Arc<dyn Blobstore>,
ctx: CoreContext,
blobstore: Arc<dyn Blobstore>,
) -> impl Future<Item = Vec<(ChangesetId, Vec<FastlogParent>)>, Error = Error> {
let mut res = vec![];
for (index, (cs_id, parent_offsets)) in self.latest.iter().enumerate() {
let mut batch_parents = vec![];
for offset in parent_offsets {
let parent_index = index + offset.0 as usize;
let batch_parent = match self.latest.get(parent_index) {
Some((p_cs_id, _)) => FastlogParent::Known(*p_cs_id),
None => FastlogParent::Unknown,
};
batch_parents.push(batch_parent);
self.fetch_raw_list(ctx, blobstore).map(|full_batch| {
let mut res = vec![];
for (index, (cs_id, parent_offsets)) in full_batch.iter().enumerate() {
let mut batch_parents = vec![];
for offset in parent_offsets {
let parent_index = index + offset.0 as usize;
let batch_parent = match full_batch.get(parent_index) {
Some((p_cs_id, _)) => FastlogParent::Known(*p_cs_id),
None => FastlogParent::Unknown,
};
batch_parents.push(batch_parent);
}
res.push((*cs_id, batch_parents));
}
res.push((*cs_id, batch_parents));
res
})
}
fn fetch_raw_list(
&self,
ctx: CoreContext,
blobstore: Arc<dyn Blobstore>,
) -> BoxFuture<Vec<(ChangesetId, Vec<ParentOffset>)>, Error> {
let mut v = vec![];
for p in self.previous_batches.iter() {
v.push(p.load(ctx.clone(), &blobstore).from_err().and_then({
cloned!(ctx, blobstore);
move |full_batch| full_batch.fetch_raw_list(ctx, blobstore)
}));
}
if !self.previous_batches.is_empty() {
// TODO(stash): handle previous_batches correctly
unimplemented!()
}
let mut res = vec![];
res.extend(self.latest.clone());
future::join_all(v)
.map(move |previous_batches| {
for p in previous_batches {
res.extend(p);
}
res
})
.boxify()
}
future::ok(res)
#[cfg(test)]
pub(crate) fn latest(&self) -> &VecDeque<(ChangesetId, Vec<ParentOffset>)> {
&self.latest
}
#[cfg(test)]
pub(crate) fn previous_batches(&self) -> &VecDeque<FastlogBatchId> {
&self.previous_batches
}
fn from_thrift(th: thrift::FastlogBatch) -> Result<FastlogBatch, Error> {
@ -326,47 +380,59 @@ mod test {
use tokio::runtime::Runtime;
#[test]
fn convert_to_list_simple() {
fn fetch_flattened_simple() {
let ctx = CoreContext::test_mock();
let repo = linear::getrepo();
let mut rt = Runtime::new().unwrap();
let batch = FastlogBatch::new(ONES_CSID);
let mut d = VecDeque::new();
d.push_back((ONES_CSID, vec![]));
let batch = FastlogBatch::new(d);
let blobstore = Arc::new(repo.get_blobstore());
assert_eq!(
vec![(ONES_CSID, vec![])],
rt.block_on(batch.convert_to_list(ctx, blobstore)).unwrap()
rt.block_on(batch.fetch_flattened(ctx, blobstore)).unwrap()
);
}
#[test]
fn convert_to_list_prepend() {
fn fetch_flattened_prepend() {
let ctx = CoreContext::test_mock();
let repo = linear::getrepo();
let mut rt = Runtime::new().unwrap();
let batch = FastlogBatch::new(ONES_CSID);
let mut d = VecDeque::new();
d.push_back((ONES_CSID, vec![]));
let batch = FastlogBatch::new(d);
let blobstore = Arc::new(repo.get_blobstore());
assert_eq!(
vec![(ONES_CSID, vec![])],
rt.block_on(batch.convert_to_list(ctx.clone(), blobstore.clone()))
rt.block_on(batch.fetch_flattened(ctx.clone(), blobstore.clone()))
.unwrap()
);
let prepended = rt
.block_on(batch.prepend_single_parent(ctx.clone(), blobstore.clone(), TWOS_CSID))
.block_on(batch.prepend_child_with_single_parent(
ctx.clone(),
blobstore.clone(),
TWOS_CSID,
))
.unwrap();
assert_eq!(
vec![
(TWOS_CSID, vec![FastlogParent::Known(ONES_CSID)]),
(ONES_CSID, vec![])
],
rt.block_on(prepended.convert_to_list(ctx.clone(), blobstore.clone()))
rt.block_on(prepended.fetch_flattened(ctx.clone(), blobstore.clone()))
.unwrap()
);
let prepended = rt
.block_on(prepended.prepend_single_parent(ctx.clone(), blobstore.clone(), THREES_CSID))
.block_on(prepended.prepend_child_with_single_parent(
ctx.clone(),
blobstore.clone(),
THREES_CSID,
))
.unwrap();
assert_eq!(
vec![
@ -374,7 +440,7 @@ mod test {
(TWOS_CSID, vec![FastlogParent::Known(ONES_CSID)]),
(ONES_CSID, vec![])
],
rt.block_on(prepended.convert_to_list(ctx, blobstore))
rt.block_on(prepended.fetch_flattened(ctx, blobstore))
.unwrap()
);
}

View File

@ -62,7 +62,7 @@ pub fn prefetch_history(
let blobstore = Arc::new(repo.get_blobstore());
fetch_fastlog_batch_by_unode_id(ctx.clone(), blobstore.clone(), unode_entry).and_then(
move |maybe_fastlog_batch| {
maybe_fastlog_batch.map(|fastlog_batch| fastlog_batch.convert_to_list(ctx, blobstore))
maybe_fastlog_batch.map(|fastlog_batch| fastlog_batch.fetch_flattened(ctx, blobstore))
},
)
}
@ -248,12 +248,18 @@ impl BonsaiDerivedMapping for RootFastlogMapping {
#[cfg(test)]
mod tests {
use super::*;
use benchmark_lib::{GenManifest, GenSettings};
use blobrepo::save_bonsai_changesets;
use context::CoreContext;
use fixtures::{create_bonsai_changeset, linear};
use fixtures::{
create_bonsai_changeset, create_bonsai_changeset_with_files, linear, store_files,
};
use maplit::btreemap;
use mercurial_types::HgChangesetId;
use mononoke_types::{MPath, ManifestUnodeId};
use pretty_assertions::assert_eq;
use rand::SeedableRng;
use rand_xorshift::XorShiftRng;
use std::collections::{HashSet, VecDeque};
use std::str::FromStr;
use tokio::runtime::Runtime;
@ -362,6 +368,84 @@ mod tests {
}
}
#[test]
fn test_derive_overflow() {
let mut rt = Runtime::new().unwrap();
let repo = linear::getrepo();
let ctx = CoreContext::test_mock();
let mut bonsais = vec![];
let mut parents = vec![];
for i in 1..60 {
let filename = String::from("1");
let content = format!("{}", i);
let stored_files = store_files(
ctx.clone(),
btreemap! { filename.as_str() => Some(content.as_str()) },
repo.clone(),
);
let bcs = create_bonsai_changeset_with_files(parents, stored_files);
let bcs_id = bcs.get_changeset_id();
bonsais.push(bcs);
parents = vec![bcs_id];
}
let latest = parents.get(0).unwrap();
rt.block_on(save_bonsai_changesets(bonsais, ctx.clone(), repo.clone()))
.unwrap();
verify_all_entries_for_commit(&mut rt, ctx, repo, *latest);
}
#[test]
fn test_random_repo() {
let mut rt = Runtime::new().unwrap();
let repo = linear::getrepo();
let ctx = CoreContext::test_mock();
let mut rng = XorShiftRng::seed_from_u64(0); // reproducable Rng
let gen_settings = GenSettings::default();
let mut changes_count = vec![];
changes_count.resize(100, 100);
let latest = rt
.block_on(GenManifest::new().gen_stack(
ctx.clone(),
repo.clone(),
&mut rng,
&gen_settings,
None,
changes_count,
))
.unwrap();
verify_all_entries_for_commit(&mut rt, ctx, repo, latest);
}
fn verify_all_entries_for_commit(
rt: &mut Runtime,
ctx: CoreContext,
repo: BlobRepo,
bcs_id: ChangesetId,
) {
let root_unode_mf_id =
derive_fastlog_batch_and_unode(rt, ctx.clone(), bcs_id.clone(), repo.clone());
let blobstore = Arc::new(repo.get_blobstore());
let entries = rt
.block_on(
root_unode_mf_id
.list_all_entries(ctx.clone(), blobstore.clone())
.collect(),
)
.unwrap();
for (path, entry) in entries {
println!("verifying: path: {:?} unode: {:?}", path, entry);
verify_list(rt, ctx.clone(), repo.clone(), entry);
}
}
fn derive_fastlog_batch_and_unode(
rt: &mut Runtime,
ctx: CoreContext,
@ -414,7 +498,15 @@ mod tests {
.unwrap()
.expect("batch hasn't been generated yet");
rt.block_on(batch.convert_to_list(ctx, blobstore)).unwrap()
println!(
"batch for {:?}: latest size: {}, previous batches size: {}",
entry,
batch.latest().len(),
batch.previous_batches().len(),
);
assert!(batch.latest().len() <= 10);
assert!(batch.previous_batches().len() <= 5);
rt.block_on(batch.fetch_flattened(ctx, blobstore)).unwrap()
}
fn find_unode_history(
@ -441,6 +533,9 @@ mod tests {
.block_on(unode_entry.get_linknode(ctx.clone(), repo.clone()))
.unwrap();
history.push(linknode);
if history.len() >= 60 {
break;
}
let parents = runtime
.block_on(unode_entry.get_parents(ctx.clone(), repo.clone()))
.unwrap();

View File

@ -23,7 +23,7 @@ use mononoke_types::{
use std::collections::BTreeMap;
use std::str::FromStr;
fn store_files(
pub fn store_files(
ctx: CoreContext,
files: BTreeMap<&str, Option<&str>>,
repo: BlobRepo,
@ -1520,6 +1520,24 @@ pub fn create_bonsai_changeset_with_author(
.unwrap()
}
pub fn create_bonsai_changeset_with_files(
parents: Vec<ChangesetId>,
file_changes: BTreeMap<MPath, Option<FileChange>>,
) -> BonsaiChangeset {
BonsaiChangesetMut {
parents,
author: "author".to_string(),
author_date: DateTime::from_timestamp(0, 0).unwrap(),
committer: None,
committer_date: None,
message: "message".to_string(),
extra: btreemap! {},
file_changes,
}
.freeze()
.unwrap()
}
pub mod many_diamonds {
use super::*;