bulkops: add trait ChangesetBulkFetch

Summary:
This allows for more flexibility in structuring the code that wants to read all
the public changesets.
The usecase I have in mind is the SegmentedChangelog Seeder. The logic is
defined in the segmented_changelog crate. Constructing the Seeder is more
straight forward if it doesn't have to take direct dependency on SqlPhases and
SqlChangesets.

Reviewed By: quark-zju

Differential Revision: D24096966

fbshipit-source-id: dffa909cd27d6c05d745fd0fe0609114a50f1892
This commit is contained in:
Stefan Filip 2020-10-08 09:41:16 -07:00 committed by Facebook GitHub Bot
parent 6e2ec8b1ca
commit 9d9a928c4c

View File

@ -11,19 +11,52 @@
///! ///!
///! Utiltities for handling data in bulk. ///! Utiltities for handling data in bulk.
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc;
use anyhow::{Error, Result}; use anyhow::{Error, Result};
use futures::{ use futures::{
compat::{Future01CompatExt, Stream01CompatExt}, compat::{Future01CompatExt, Stream01CompatExt},
future::{try_join, TryFutureExt}, future::{try_join, TryFutureExt},
stream::{self, StreamExt, TryStreamExt}, stream::{self, BoxStream, StreamExt, TryStreamExt},
Stream, Stream,
}; };
use changesets::{ChangesetEntry, Changesets, SqlChangesets}; use changesets::{ChangesetEntry, Changesets, SqlChangesets};
use context::CoreContext; use context::CoreContext;
use mononoke_types::RepositoryId; use mononoke_types::RepositoryId;
use phases::SqlPhases; use phases::{Phases, SqlPhases};
pub trait ChangesetBulkFetch: Send + Sync {
fn fetch<'a>(&'a self, ctx: &'a CoreContext) -> BoxStream<'a, Result<ChangesetEntry, Error>>;
}
pub struct PublicChangesetBulkFetch {
repo_id: RepositoryId,
changesets: Arc<dyn Changesets>,
phases: Arc<dyn Phases>,
}
impl ChangesetBulkFetch for PublicChangesetBulkFetch {
fn fetch<'a>(&'a self, ctx: &'a CoreContext) -> BoxStream<'a, Result<ChangesetEntry, Error>> {
let sql_changesets = self.changesets.get_sql_changesets();
let sql_phases = self.phases.get_sql_phases();
fetch_all_public_changesets(ctx, self.repo_id, sql_changesets, sql_phases).boxed()
}
}
impl PublicChangesetBulkFetch {
pub fn new(
repo_id: RepositoryId,
changesets: Arc<dyn Changesets>,
phases: Arc<dyn Phases>,
) -> Self {
Self {
repo_id,
changesets,
phases,
}
}
}
// This function is not optimal since it could be made faster by doing more processing // This function is not optimal since it could be made faster by doing more processing
// on XDB side, but for the puprpose of this binary it is good enough // on XDB side, but for the puprpose of this binary it is good enough