sapling/eden/mononoke/microwave/builder/changesets.rs
Mark Juggurnauth-Thomas d66e56c407 changesets: remember repo_id in changesets
Summary:
The changesets object is only valid to access the changesets of a single repo
(other repos may have different metadata database config), so it is pointless
for all methods to require the caller to provide the correct one.  Instead,
make the changesets object remember the repo id.

Reviewed By: krallin

Differential Revision: D27430611

fbshipit-source-id: bf2c398af2e5eb77c1c7c55a89752753020939ab
2021-04-29 06:11:20 -07:00

118 lines
3.2 KiB
Rust

/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
use anyhow::Error;
use async_trait::async_trait;
use changesets::{ChangesetEntry, ChangesetInsert, Changesets, SortOrder};
use cloned::cloned;
use context::CoreContext;
use futures::channel::mpsc::Sender;
use futures::sink::SinkExt;
use futures::stream::BoxStream;
use mononoke_types::{
ChangesetId, ChangesetIdPrefix, ChangesetIdsResolvedFromPrefix, RepositoryId,
};
use std::sync::Arc;
#[derive(Clone)]
pub struct MicrowaveChangesets {
repo_id: RepositoryId,
recorder: Sender<ChangesetEntry>,
inner: Arc<dyn Changesets>,
}
impl MicrowaveChangesets {
pub fn new(recorder: Sender<ChangesetEntry>, inner: Arc<dyn Changesets>) -> Self {
Self {
repo_id: inner.repo_id(),
recorder,
inner,
}
}
}
#[async_trait]
impl Changesets for MicrowaveChangesets {
fn repo_id(&self) -> RepositoryId {
self.repo_id
}
async fn add(&self, _ctx: CoreContext, _cs: ChangesetInsert) -> Result<bool, Error> {
// See rationale in filenodes.rs for why we error out on unexpected calls under
// MicrowaveFilenodes.
unimplemented!(
"MicrowaveChangesets: unexpected add in repo {}",
self.repo_id
)
}
async fn get(
&self,
ctx: CoreContext,
cs_id: ChangesetId,
) -> Result<Option<ChangesetEntry>, Error> {
cloned!(self.inner, mut self.recorder);
let entry = inner.get(ctx, cs_id).await?;
if let Some(ref entry) = entry {
// NOTE: See MicrowaveFilenodes for context on this.
assert_eq!(entry.repo_id, self.repo_id);
recorder.send(entry.clone()).await?;
}
Ok(entry)
}
async fn get_many(
&self,
_ctx: CoreContext,
_cs_ids: Vec<ChangesetId>,
) -> Result<Vec<ChangesetEntry>, Error> {
unimplemented!(
"MicrowaveChangesets: unexpected get_many in repo {}",
self.repo_id
)
}
async fn get_many_by_prefix(
&self,
_ctx: CoreContext,
_cs_prefix: ChangesetIdPrefix,
_limit: usize,
) -> Result<ChangesetIdsResolvedFromPrefix, Error> {
unimplemented!(
"MicrowaveChangesets: unexpected get_many_by_prefix in repo {}",
self.repo_id
)
}
fn prime_cache(&self, ctx: &CoreContext, changesets: &[ChangesetEntry]) {
self.inner.prime_cache(ctx, changesets)
}
async fn enumeration_bounds(
&self,
ctx: &CoreContext,
read_from_master: bool,
) -> Result<Option<(u64, u64)>, Error> {
self.inner.enumeration_bounds(ctx, read_from_master).await
}
fn list_enumeration_range(
&self,
ctx: &CoreContext,
min_id: u64,
max_id: u64,
sort_and_limit: Option<(SortOrder, u64)>,
read_from_master: bool,
) -> BoxStream<'_, Result<(ChangesetId, u64), Error>> {
self.inner
.list_enumeration_range(ctx, min_id, max_id, sort_and_limit, read_from_master)
}
}