Remove Repo trait completely

Summary:
We're never going to serve RevlogRepo in production, and we're down to
a single BlobRepo type that will have different backing stores. Remove the
unused trait, and use BlobRepo everywhere bar blobimport and repo_config
(because we previously hardcoded revlog here - we want to change to a BlobRepo
once blobimport is full-fidelity).

Reviewed By: jsgf

Differential Revision: D6596164

fbshipit-source-id: ba6e76e78c495720792cbe77ae6037f7802ec126
This commit is contained in:
Simon Farnsworth 2018-01-15 05:57:33 -08:00 committed by Facebook Github Bot
parent df1590c586
commit ebafde00b0
35 changed files with 364 additions and 992 deletions

View File

@ -5,6 +5,7 @@
// GNU General Public License version 2 or any later version.
//! Plain files, symlinks
use std::sync::Arc;
use futures::future::Future;
use futures_ext::{BoxFuture, FutureExt};
@ -28,11 +29,11 @@ pub struct BlobEntry<B> {
ty: Type,
}
pub fn fetch_blob_from_blobstore<B>(blobstore: B, nodeid: NodeHash) -> BoxFuture<Vec<u8>, Error>
where
B: Blobstore + Clone,
{
get_node(&blobstore, nodeid)
pub fn fetch_blob_from_blobstore(
blobstore: &Arc<Blobstore>,
nodeid: NodeHash,
) -> BoxFuture<Vec<u8>, Error> {
get_node(blobstore, nodeid)
.and_then({
let blobstore = blobstore.clone();
move |node| {

View File

@ -45,7 +45,6 @@ extern crate storage_types;
mod repo;
mod changeset;
mod manifest;
mod state;
mod file;
mod errors;
mod utils;
@ -55,7 +54,6 @@ pub use errors::*;
pub use changeset::BlobChangeset;
pub use manifest::BlobManifest;
pub use repo::BlobRepo;
pub use state::{BlobState, FilesBlobState, MemBlobState, RocksBlobState, TestManifoldBlobState};
//
// TODO: (jsgf) T21597565 This is exposed here for blobimport -- don't use it for anything else.

View File

@ -6,115 +6,190 @@
use std::collections::HashSet;
use std::mem;
use std::path::Path;
use std::sync::Arc;
use failure::ResultExt;
use futures::{Async, Poll};
use futures::future::Future;
use futures::stream::{self, Stream};
use futures_ext::{BoxFuture, BoxStream, FutureExt, StreamExt};
use blobstore::Blobstore;
use bookmarks::Bookmarks;
use fileblob::Fileblob;
use filebookmarks::FileBookmarks;
use fileheads::FileHeads;
use filelinknodes::FileLinknodes;
use heads::Heads;
use mercurial_types::{Changeset, Manifest, NodeHash, Repo};
use linknodes::Linknodes;
use manifoldblob::ManifoldBlob;
use memblob::Memblob;
use membookmarks::MemBookmarks;
use memheads::MemHeads;
use memlinknodes::MemLinknodes;
use mercurial_types::{Changeset, Manifest, NodeHash};
use rocksblob::Rocksblob;
use storage_types::Version;
use tokio_core::reactor::Remote;
use BlobChangeset;
use BlobManifest;
use BlobState;
use errors::*;
use file::fetch_blob_from_blobstore;
pub struct BlobRepo<State> {
inner: Arc<State>,
pub struct BlobRepo {
blobstore: Arc<Blobstore>,
bookmarks: Arc<Bookmarks>,
heads: Arc<Heads>,
linknodes: Arc<Linknodes>,
}
impl<State> BlobRepo<State> {
pub fn new(state: State) -> Self {
Self {
inner: Arc::new(state),
impl BlobRepo {
pub fn new(
heads: Arc<Heads>,
bookmarks: Arc<Bookmarks>,
blobstore: Arc<Blobstore>,
linknodes: Arc<Linknodes>,
) -> Self {
BlobRepo {
heads,
bookmarks,
blobstore,
linknodes,
}
}
}
impl<State> BlobRepo<State>
where
State: BlobState,
{
pub fn get_blob(&self, key: &NodeHash) -> BoxFuture<Vec<u8>, Error> {
fetch_blob_from_blobstore(self.inner.blobstore().clone(), *key)
pub fn new_files(path: &Path) -> Result<Self> {
let heads = FileHeads::open(path.join("heads"))
.context(ErrorKind::StateOpen(StateOpenError::Heads))?;
let bookmarks = Arc::new(FileBookmarks::open(path.join("books"))
.context(ErrorKind::StateOpen(StateOpenError::Bookmarks))?);
let blobstore = Fileblob::open(path.join("blobs"))
.context(ErrorKind::StateOpen(StateOpenError::Blobstore))?;
let linknodes = Arc::new(FileLinknodes::open(path.join("linknodes"))
.context(ErrorKind::StateOpen(StateOpenError::Linknodes))?);
Ok(Self::new(
Arc::new(heads),
Arc::new(bookmarks),
Arc::new(blobstore),
Arc::new(linknodes),
))
}
}
impl<State> Repo for BlobRepo<State>
where
State: BlobState,
{
fn get_changesets(&self) -> BoxStream<NodeHash, Error> {
pub fn new_rocksdb(path: &Path) -> Result<Self> {
let heads = FileHeads::open(path.join("heads"))
.context(ErrorKind::StateOpen(StateOpenError::Heads))?;
let bookmarks = FileBookmarks::open(path.join("books"))
.context(ErrorKind::StateOpen(StateOpenError::Bookmarks))?;
let blobstore = Rocksblob::open(path.join("blobs"))
.context(ErrorKind::StateOpen(StateOpenError::Blobstore))?;
let linknodes = FileLinknodes::open(path.join("linknodes"))
.context(ErrorKind::StateOpen(StateOpenError::Linknodes))?;
Ok(Self::new(
Arc::new(heads),
Arc::new(bookmarks),
Arc::new(blobstore),
Arc::new(linknodes),
))
}
pub fn new_memblob(
heads: MemHeads,
bookmarks: MemBookmarks,
blobstore: Memblob,
linknodes: MemLinknodes,
) -> Self {
Self::new(
Arc::new(heads),
Arc::new(bookmarks),
Arc::new(blobstore),
Arc::new(linknodes),
)
}
pub fn new_test_manifold<T: ToString>(bucket: T, remote: &Remote) -> Result<Self> {
let heads = MemHeads::new();
let bookmarks = MemBookmarks::new();
let blobstore = ManifoldBlob::new_may_panic(bucket.to_string(), remote);
let linknodes = MemLinknodes::new();
Ok(Self::new(
Arc::new(heads),
Arc::new(bookmarks),
Arc::new(blobstore),
Arc::new(linknodes),
))
}
pub fn get_blob(&self, key: &NodeHash) -> BoxFuture<Vec<u8>, Error> {
fetch_blob_from_blobstore(&self.blobstore, *key)
}
pub fn get_changesets(&self) -> BoxStream<NodeHash, Error> {
BlobChangesetStream {
repo: BlobRepo {
inner: self.inner.clone(),
},
heads: self.inner.heads().heads().boxify(),
repo: self.clone(),
heads: self.heads.heads().boxify(),
state: BCState::Idle,
seen: HashSet::new(),
}.boxify()
}
fn get_heads(&self) -> BoxStream<NodeHash, Error> {
self.inner.heads().heads().boxify()
pub fn get_heads(&self) -> BoxStream<NodeHash, Error> {
self.heads.heads().boxify()
}
fn changeset_exists(&self, nodeid: &NodeHash) -> BoxFuture<bool, Error> {
BlobChangeset::load(self.inner.blobstore(), nodeid)
pub fn changeset_exists(&self, nodeid: &NodeHash) -> BoxFuture<bool, Error> {
BlobChangeset::load(&self.blobstore, nodeid)
.map(|cs| cs.is_some())
.boxify()
}
fn get_changeset_by_nodeid(&self, nodeid: &NodeHash) -> BoxFuture<Box<Changeset>, Error> {
pub fn get_changeset_by_nodeid(&self, nodeid: &NodeHash) -> BoxFuture<Box<Changeset>, Error> {
let nodeid = *nodeid;
BlobChangeset::load(self.inner.blobstore(), &nodeid)
.and_then(move |cs| {
cs.ok_or(ErrorKind::ChangesetMissing(nodeid).into())
})
BlobChangeset::load(&self.blobstore, &nodeid)
.and_then(move |cs| cs.ok_or(ErrorKind::ChangesetMissing(nodeid).into()))
.map(|cs| cs.boxed())
.boxify()
}
fn get_manifest_by_nodeid(&self, nodeid: &NodeHash) -> BoxFuture<Box<Manifest + Sync>, Error> {
pub fn get_manifest_by_nodeid(
&self,
nodeid: &NodeHash,
) -> BoxFuture<Box<Manifest + Sync>, Error> {
let nodeid = *nodeid;
BlobManifest::load(self.inner.blobstore(), &nodeid)
.and_then(move |mf| {
mf.ok_or(ErrorKind::ManifestMissing(nodeid).into())
})
BlobManifest::load(&self.blobstore, &nodeid)
.and_then(move |mf| mf.ok_or(ErrorKind::ManifestMissing(nodeid).into()))
.map(|m| m.boxed())
.boxify()
}
fn get_bookmark_keys(&self) -> BoxStream<Vec<u8>, Error> {
self.inner.bookmarks().keys().boxify()
pub fn get_bookmark_keys(&self) -> BoxStream<Vec<u8>, Error> {
self.bookmarks.keys().boxify()
}
fn get_bookmark_value(
pub fn get_bookmark_value(
&self,
key: &AsRef<[u8]>,
) -> BoxFuture<Option<(NodeHash, Version)>, Error> {
self.inner.bookmarks().get(key).boxify()
self.bookmarks.get(key).boxify()
}
}
impl<State> Clone for BlobRepo<State> {
impl Clone for BlobRepo {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
heads: self.heads.clone(),
bookmarks: self.bookmarks.clone(),
blobstore: self.blobstore.clone(),
linknodes: self.linknodes.clone(),
}
}
}
pub struct BlobChangesetStream<State>
where
State: BlobState,
{
repo: BlobRepo<State>,
pub struct BlobChangesetStream {
repo: BlobRepo,
seen: HashSet<NodeHash>,
heads: BoxStream<NodeHash, Error>,
state: BCState,
@ -125,10 +200,7 @@ enum BCState {
WaitCS(NodeHash, BoxFuture<Box<Changeset>, Error>),
}
impl<State> Stream for BlobChangesetStream<State>
where
State: BlobState,
{
impl Stream for BlobChangesetStream {
type Item = NodeHash;
type Error = Error;

View File

@ -1,202 +0,0 @@
// Copyright (c) 2004-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use std::path::Path;
use std::sync::Arc;
use failure::ResultExt;
use blobstore::Blobstore;
use bookmarks::Bookmarks;
use fileblob::Fileblob;
use filebookmarks::FileBookmarks;
use fileheads::FileHeads;
use filelinknodes::FileLinknodes;
use heads::Heads;
use linknodes::Linknodes;
use manifoldblob::ManifoldBlob;
use memblob::Memblob;
use membookmarks::MemBookmarks;
use memheads::MemHeads;
use memlinknodes::MemLinknodes;
use rocksblob::Rocksblob;
use tokio_core::reactor::Remote;
use errors::*;
/// Represents all the state used by a blob store.
pub trait BlobState: 'static + Send + Sync {
type Heads: Heads + Sync;
type Bookmarks: Bookmarks + Clone + Sync;
type Blobstore: Blobstore + Clone + Sync;
type Linknodes: Linknodes + Clone;
fn heads(&self) -> &Self::Heads;
fn bookmarks(&self) -> &Self::Bookmarks;
fn blobstore(&self) -> &Self::Blobstore;
fn linknodes(&self) -> &Self::Linknodes;
}
macro_rules! impl_blob_state {
{
$struct_type: ident {
heads: $head_type: ty,
bookmarks: $book_type: ty,
blobstore: $blob_type: ty,
linknodes: $link_type: ty,
}
} => {
pub struct $struct_type {
heads: $head_type,
bookmarks: $book_type,
blobstore: $blob_type,
linknodes: $link_type,
}
impl BlobState for $struct_type {
type Heads = $head_type;
type Bookmarks = $book_type;
type Blobstore = $blob_type;
type Linknodes = $link_type;
#[inline]
fn heads(&self) -> &Self::Heads {
&self.heads
}
#[inline]
fn bookmarks(&self) -> &Self::Bookmarks {
&self.bookmarks
}
#[inline]
fn blobstore(&self) -> &Self::Blobstore {
&self.blobstore
}
#[inline]
fn linknodes(&self) -> &Self::Linknodes {
&self.linknodes
}
}
}
}
impl_blob_state! {
FilesBlobState {
heads: FileHeads,
bookmarks: Arc<FileBookmarks>,
blobstore: Fileblob,
linknodes: Arc<FileLinknodes>,
}
}
impl FilesBlobState {
pub fn new(path: &Path) -> Result<Self> {
let heads = FileHeads::open(path.join("heads"))
.context(ErrorKind::StateOpen(StateOpenError::Heads))?;
let bookmarks = Arc::new(
FileBookmarks::open(path.join("books"))
.context(ErrorKind::StateOpen(StateOpenError::Bookmarks))?,
);
let blobstore = Fileblob::open(path.join("blobs"))
.context(ErrorKind::StateOpen(StateOpenError::Blobstore))?;
let linknodes = Arc::new(
FileLinknodes::open(path.join("linknodes"))
.context(ErrorKind::StateOpen(StateOpenError::Linknodes))?,
);
Ok(FilesBlobState {
heads,
bookmarks,
blobstore,
linknodes,
})
}
}
impl_blob_state! {
RocksBlobState {
heads: FileHeads,
bookmarks: Arc<FileBookmarks>,
blobstore: Rocksblob,
linknodes: Arc<FileLinknodes>,
}
}
impl RocksBlobState {
pub fn new(path: &Path) -> Result<Self> {
let heads = FileHeads::open(path.join("heads"))
.context(ErrorKind::StateOpen(StateOpenError::Heads))?;
let bookmarks = Arc::new(
FileBookmarks::open(path.join("books"))
.context(ErrorKind::StateOpen(StateOpenError::Bookmarks))?,
);
let blobstore = Rocksblob::open(path.join("blobs"))
.context(ErrorKind::StateOpen(StateOpenError::Blobstore))?;
let linknodes = Arc::new(
FileLinknodes::open(path.join("linknodes"))
.context(ErrorKind::StateOpen(StateOpenError::Linknodes))?,
);
Ok(RocksBlobState {
heads,
bookmarks,
blobstore,
linknodes,
})
}
}
impl_blob_state! {
MemBlobState {
heads: MemHeads,
bookmarks: Arc<MemBookmarks>,
blobstore: Memblob,
linknodes: Arc<MemLinknodes>,
}
}
impl MemBlobState {
pub fn new(
heads: MemHeads,
bookmarks: MemBookmarks,
blobstore: Memblob,
linknodes: MemLinknodes,
) -> Self {
MemBlobState {
heads,
bookmarks: Arc::new(bookmarks),
blobstore,
linknodes: Arc::new(linknodes),
}
}
}
impl_blob_state! {
TestManifoldBlobState {
heads: MemHeads,
bookmarks: Arc<MemBookmarks>,
blobstore: ManifoldBlob,
linknodes: Arc<MemLinknodes>,
}
}
impl TestManifoldBlobState {
pub fn new<T: ToString>(bucket: T, remote: &Remote) -> Result<Self> {
let heads = MemHeads::new();
let bookmarks = Arc::new(MemBookmarks::new());
let blobstore = ManifoldBlob::new_may_panic(bucket.to_string(), remote);
let linknodes = Arc::new(MemLinknodes::new());
Ok(TestManifoldBlobState {
heads,
bookmarks,
blobstore,
linknodes,
})
}
}

View File

@ -21,10 +21,7 @@ pub struct RawNodeBlob {
pub blob: BlobHash,
}
pub fn get_node<B>(blobstore: &B, nodeid: NodeHash) -> BoxFuture<RawNodeBlob, Error>
where
B: Blobstore,
{
pub fn get_node(blobstore: &Blobstore, nodeid: NodeHash) -> BoxFuture<RawNodeBlob, Error> {
let key = format!("node-{}.bincode", nodeid);
blobstore

View File

@ -55,7 +55,7 @@ use std::sync::Arc;
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
use blobrepo::{BlobRepo, BlobState, RocksBlobState, TestManifoldBlobState};
use blobrepo::BlobRepo;
use clap::App;
use futures::{Future, IntoFuture, Stream};
use futures::sync::oneshot;
@ -64,7 +64,7 @@ use futures_ext::{BoxFuture, FutureExt};
use futures_stats::{Stats, Timed};
use hyper::StatusCode;
use hyper::server::{Http, Request, Response, Service};
use mercurial_types::{NodeHash, Repo};
use mercurial_types::NodeHash;
use native_tls::TlsAcceptor;
use native_tls::backend::openssl::TlsAcceptorBuilderExt;
use openssl::ssl::{SSL_VERIFY_FAIL_IF_NO_PEER_CERT, SSL_VERIFY_PEER};
@ -75,7 +75,7 @@ use tokio_tls::TlsAcceptorExt;
pub use failure::{DisplayChain, Error, Result, ResultExt};
type NameToRepo<State> = HashMap<String, Arc<BlobRepo<State>>>;
type NameToRepo = HashMap<String, Arc<BlobRepo>>;
type UrlParseFunc = fn(Captures) -> Result<ParsedUrl>;
struct Route(Regex, UrlParseFunc);
@ -161,7 +161,6 @@ lazy_static! {
};
}
#[derive(Serialize)]
struct TreeMetadata {
hash: NodeHash,
@ -200,23 +199,18 @@ struct TreeMetadataOptions {
fetch_size: bool,
}
struct EdenServer<State> {
name_to_repo: NameToRepo<State>,
struct EdenServer {
name_to_repo: NameToRepo,
cpupool: Arc<CpuPool>,
logger: Logger,
scuba: Arc<ScubaClient>,
}
impl<State> EdenServer<State>
impl EdenServer
where
EdenServer<State>: Service,
State: BlobState,
EdenServer: Service,
{
fn new(
name_to_repo: NameToRepo<State>,
cpupool: Arc<CpuPool>,
logger: Logger,
) -> EdenServer<State> {
fn new(name_to_repo: NameToRepo, cpupool: Arc<CpuPool>, logger: Logger) -> EdenServer {
EdenServer {
name_to_repo,
cpupool,
@ -308,10 +302,7 @@ fn add_common_stats(sample: &mut ScubaSample, stats: &Stats) {
sample.add(SCUBA_COL_POLL_COUNT, stats.poll_count);
}
impl<State> Service for EdenServer<State>
where
State: BlobState,
{
impl Service for EdenServer {
type Request = Request;
type Response = Response;
type Error = hyper::Error;
@ -387,7 +378,6 @@ where
}
}
// Builds an acceptor that has `accept_async()` method that handles tls handshake
// and returns decrypted stream.
fn build_tls_acceptor(ssl: Ssl) -> Result<TlsAcceptor> {
@ -413,13 +403,9 @@ fn build_tls_acceptor(ssl: Ssl) -> Result<TlsAcceptor> {
tlsacceptor_builder.build().map_err(Error::from)
}
fn start_server<State>(addr: &str, reponame: String, state: State, logger: Logger, ssl: Ssl)
where
State: BlobState,
{
fn start_server(addr: &str, reponame: String, repo: BlobRepo, logger: Logger, ssl: Ssl) {
let addr = addr.parse().expect("Failed to parse address");
let mut map = HashMap::new();
let repo = BlobRepo::new(state);
map.insert(reponame, Arc::new(repo));
let tlsacceptor = build_tls_acceptor(ssl);
@ -462,6 +448,7 @@ where
/// Types of repositories supported
#[derive(Clone, Debug, Deserialize)]
enum RawRepoType {
#[serde(rename = "blob:files")] BlobFiles,
#[serde(rename = "blob:rocks")] BlobRocks,
#[serde(rename = "blob:manifold")] BlobManifold,
}
@ -516,10 +503,18 @@ fn main() {
};
match config.repotype {
RawRepoType::BlobFiles => start_server(
&config.addr,
config.reponame,
BlobRepo::new_files(&config.path.expect("Please specify a path to the blobrepo"))
.expect("couldn't open blob state"),
root_logger.clone(),
config.ssl,
),
RawRepoType::BlobRocks => start_server(
&config.addr,
config.reponame,
RocksBlobState::new(&config.path.expect("Please specify a path to the blobrepo"))
BlobRepo::new_rocksdb(&config.path.expect("Please specify a path to the blobrepo"))
.expect("couldn't open blob state"),
root_logger.clone(),
config.ssl,
@ -544,7 +539,7 @@ fn main() {
start_server(
&config.addr,
config.reponame,
TestManifoldBlobState::new(
BlobRepo::new_test_manifold(
config
.manifold_bucket
.expect("manifold bucket is not specified"),

View File

@ -17,7 +17,7 @@ use mercurial_types::NodeHash;
/// Trait representing the interface to a heads store, which more generally is just
/// a set of commit identifiers.
pub trait Heads: Send + 'static {
pub trait Heads: Send + Sync + 'static {
// Heads are not guaranteed to be returned in any particular order. Heads that exist for
// the entire duration of the traversal are guaranteed to appear at least once.

View File

@ -13,4 +13,5 @@ pub enum ErrorKind {
#[fail(display = "malformed batch with command '{}'", _0)] BatchInvalid(String),
#[fail(display = "unknown escape character in batch command '{}'", _0)] BatchEscape(u8),
#[fail(display = "Repo error")] RepoError,
#[fail(display = "cannot serve revlog repos")] CantServeRevlogRepo,
}

View File

@ -17,10 +17,14 @@ extern crate maplit;
#[cfg(test)]
extern crate tempdir;
extern crate blobrepo;
extern crate hlua_futures;
extern crate mercurial;
extern crate mercurial_types;
#[cfg(test)]
extern crate linear;
mod errors;
use std::collections::HashMap;
@ -31,8 +35,9 @@ use failure::ResultExt;
use futures::Future;
use hlua::{AnyLuaValue, Lua, LuaError, PushGuard};
use blobrepo::BlobRepo;
use hlua_futures::{AnyFuture, LuaCoroutine, LuaCoroutineBuilder};
use mercurial_types::{Changeset, NodeHash, Repo};
use mercurial_types::{Changeset, NodeHash};
pub use errors::*;
@ -49,14 +54,14 @@ pub struct HookManager<'lua> {
lua: Lua<'lua>,
}
pub struct HookContext<'hook, R: Repo> {
pub struct HookContext<'hook> {
name: &'hook str,
repo: Arc<R>,
repo: Arc<BlobRepo>,
info: HashMap<&'static str, String>,
code: &'hook str,
}
impl<'hook, R: Repo> HookContext<'hook, R> {
impl<'hook> HookContext<'hook> {
fn run<'a, 'lua>(
&self,
lua: &'a mut Lua<'lua>,
@ -65,19 +70,14 @@ impl<'hook, R: Repo> HookContext<'hook, R> {
let name = self.name.to_string();
let get_author = move |hash: String| -> Result<AnyFuture> {
let hash = hash.into_ascii_string().map_err(|hash| {
ErrorKind::InvalidHash(name.clone(), hash.into_source())
})?;
let hash = hash.into_ascii_string()
.map_err(|hash| ErrorKind::InvalidHash(name.clone(), hash.into_source()))?;
let hash = NodeHash::from_ascii_str(&hash)
.with_context(|_| ErrorKind::InvalidHash(name.clone(), hash.into()))?;
let future = repo.get_changeset_by_nodeid(&hash)
.map_err(|err| {
LuaError::ExecutionError(format!("failed to get author: {}", err))
})
.map(|cs| {
AnyLuaValue::LuaString(String::from_utf8_lossy(cs.user()).into_owned())
});
.map_err(|err| LuaError::ExecutionError(format!("failed to get author: {}", err)))
.map(|cs| AnyLuaValue::LuaString(String::from_utf8_lossy(cs.user()).into_owned()));
Ok(AnyFuture::new(future))
};
lua.set("get_author", hlua::function1(get_author));
@ -108,9 +108,9 @@ impl<'lua> HookManager<'lua> {
HookManager { lua }
}
pub fn run_hook<'hook, R: Repo>(
pub fn run_hook<'hook>(
&mut self,
hook: HookContext<'hook, R>,
hook: HookContext<'hook>,
) -> Result<LuaCoroutine<PushGuard<&mut Lua<'lua>>, bool>> {
// TODO: with multiple Lua contexts, choose a context to run in. Probably use a queue or
// something.
@ -120,30 +120,18 @@ impl<'lua> HookManager<'lua> {
#[cfg(test)]
mod test {
use std::fs::File;
use std::path::Path;
use std::process::Command;
use tempdir::TempDir;
use super::*;
#[test]
fn test_hook() {
let (hash, dir) = create_repo();
let dot_hg = dir.as_ref().join(".hg");
let hook_info = hashmap! {
"repo" => "fbsource".into(),
"bookmark" => "master".into(),
"old_hash" => "0000000000000000000000000000000000000000".into(),
"new_hash" => hash,
"new_hash" => "a5ffa77602a066db7d5cfb9fb5823a0895717c5a".into(),
};
let mut hook_manager = HookManager::new();
let repo = match mercurial::RevlogRepo::open(&dot_hg) {
Ok(repo) => repo,
Err(err) => panic!("RevlogRepo::open({}) failed {:?}", dot_hg.display(), err),
};
let repo = linear::getrepo();
let hook = HookContext {
name: "test",
repo: Arc::new(repo),
@ -154,7 +142,7 @@ mod test {
return false
else
author = coroutine.yield(get_author(info.new_hash))
return author == \"testuser\"
return author == \"Jeremy Fitzhardinge <jsgf@fb.com>\"
end
end",
};
@ -163,53 +151,4 @@ mod test {
let result = coroutine_fut.wait();
assert!(result.unwrap());
}
fn create_repo() -> (String, TempDir) {
// XXX replace this with a valid prebuilt repo
let dir = TempDir::new("mononoke-hooks").unwrap();
let status = Command::new("hg")
.arg("init")
.current_dir(&dir)
.status()
.expect("hg init failed");
assert!(status.success());
{
let new_file = dir.as_ref().join("foo.txt");
File::create(new_file).unwrap();
}
let status = hg_cmd(&dir)
.arg("add")
.arg("foo.txt")
.status()
.expect("hg add failed");
assert!(status.success());
let status = hg_cmd(&dir)
.arg("commit")
.arg("-utestuser")
.arg("-mtest")
.status()
.expect("hg commit failed");
assert!(status.success());
// Get the new hash and return it.
let output = hg_cmd(&dir)
.arg("log")
.arg("-r.")
.arg("-T{node}")
.output()
.expect("hg log failed");
assert!(output.status.success());
let stdout = output.stdout;
(String::from_utf8(stdout).unwrap(), dir)
}
fn hg_cmd<P: AsRef<Path>>(dir: P) -> Command {
let mut command = Command::new("hg");
command.env("HGPLAIN", "1");
command.current_dir(dir);
command
}
}

View File

@ -5,19 +5,10 @@
// GNU General Public License version 2 or any later version.
use std::collections::BTreeMap;
use std::ops::Deref;
use futures::{Future, Stream};
use futures::stream;
use blobnode::Parents;
use futures_ext::{BoxFuture, BoxStream, FutureExt, StreamExt};
use node::Node;
use nodehash::NodeHash;
use path::MPath;
use repo::Repo;
use errors::*;
pub trait Changeset: Send + 'static {
fn manifestid(&self) -> &NodeHash;
@ -71,58 +62,3 @@ pub struct Time {
pub time: u64,
pub tz: i32,
}
#[derive(Debug)]
pub struct RepoChangeset<R> {
repo: R,
csid: NodeHash,
}
impl<R> RepoChangeset<R> {
pub fn new(repo: R, csid: NodeHash) -> Self {
Self { repo, csid }
}
pub fn get_csid(&self) -> &NodeHash {
&self.csid
}
}
impl<R> AsRef<NodeHash> for RepoChangeset<R> {
fn as_ref(&self) -> &NodeHash {
self.get_csid()
}
}
impl<R> Deref for RepoChangeset<R> {
type Target = R;
fn deref(&self) -> &Self::Target {
&self.repo
}
}
impl<R> Node for RepoChangeset<R>
where
R: Repo + Clone,
{
type Content = Box<Changeset>;
type GetParents = BoxStream<Self, Error>;
type GetContent = BoxFuture<Self::Content, Error>;
fn get_parents(&self) -> Self::GetParents {
self.repo.get_changeset_by_nodeid(&self.csid) // Future<Changeset>
.map(|cs| stream::iter_ok(cs.parents().into_iter())) // Future<Stream<>>
.flatten_stream() // Stream<NodeHash>
.map({
let repo = self.repo.clone();
move |p| Self::new(repo.clone(), p)
}) // Stream<Self>
.boxify()
}
fn get_content(&self) -> Self::GetContent {
self.repo.get_changeset_by_nodeid(&self.csid).boxify()
}
}

View File

@ -48,7 +48,6 @@ pub mod hash;
pub mod nodehash;
pub mod path;
pub mod utils;
pub mod repo;
pub mod manifest;
pub mod blob;
pub mod blobnode;
@ -63,7 +62,6 @@ pub use manifest::{Entry, Manifest, Type};
pub use node::Node;
pub use nodehash::{NodeHash, NULL_HASH};
pub use path::{fncache_fsencode, simple_fsencode, MPath, MPathElement, RepoPath};
pub use repo::{BoxRepo, Repo};
pub use utils::percent_encode;
pub use errors::{Error, ErrorKind};

View File

@ -1,302 +0,0 @@
// Copyright (c) 2004-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use std::sync::Arc;
use futures::Future;
use futures_ext::{BoxFuture, BoxStream, FutureExt, StreamExt};
use changeset::Changeset;
use manifest::{BoxManifest, Manifest};
use nodehash::NodeHash;
use storage_types::Version;
use errors::*;
pub trait Repo: Send + Sync + 'static {
/// Return a stream of all changeset ids
///
/// This returns a Stream which produces each changeset that's reachable from a
/// head exactly once. This does not guarantee any particular order, but something
/// approximating a BFS traversal from the heads would be ideal.
///
/// XXX Is "exactly once" too strong? This probably requires a "has seen" structure which
/// will be O(changesets) in size. Probably OK up to 10-100M changesets.
fn get_changesets(&self) -> BoxStream<NodeHash, Error>;
fn get_heads(&self) -> BoxStream<NodeHash, Error>;
fn get_bookmark_keys(&self) -> BoxStream<Vec<u8>, Error>;
fn get_bookmark_value(
&self,
key: &AsRef<[u8]>,
) -> BoxFuture<Option<(NodeHash, Version)>, Error>;
fn changeset_exists(&self, nodeid: &NodeHash) -> BoxFuture<bool, Error>;
fn get_changeset_by_nodeid(&self, nodeid: &NodeHash) -> BoxFuture<Box<Changeset>, Error>;
fn get_manifest_by_nodeid(&self, nodeid: &NodeHash) -> BoxFuture<Box<Manifest + Sync>, Error>;
fn boxed(self) -> Box<Repo + Sync>
where
Self: Sync + Sized,
{
Box::new(self)
}
}
pub struct BoxRepo<R>
where
R: Repo,
{
repo: R,
}
impl<R> BoxRepo<R>
where
R: Repo + Sync + Send,
{
pub fn new(repo: R) -> Box<Repo + Sync + Send> {
let br = BoxRepo { repo };
Box::new(br)
}
}
impl<R> Repo for BoxRepo<R>
where
R: Repo + Sync + Send + 'static,
{
fn get_changesets(&self) -> BoxStream<NodeHash, Error> {
self.repo.get_changesets().boxify()
}
fn get_heads(&self) -> BoxStream<NodeHash, Error> {
self.repo.get_heads().boxify()
}
fn get_bookmark_keys(&self) -> BoxStream<Vec<u8>, Error> {
self.repo.get_bookmark_keys().boxify()
}
fn get_bookmark_value(
&self,
key: &AsRef<[u8]>,
) -> BoxFuture<Option<(NodeHash, Version)>, Error> {
self.repo.get_bookmark_value(key).boxify()
}
fn changeset_exists(&self, nodeid: &NodeHash) -> BoxFuture<bool, Error> {
self.repo.changeset_exists(nodeid).boxify()
}
fn get_changeset_by_nodeid(&self, nodeid: &NodeHash) -> BoxFuture<Box<Changeset>, Error> {
self.repo.get_changeset_by_nodeid(nodeid).boxify()
}
fn get_manifest_by_nodeid(&self, nodeid: &NodeHash) -> BoxFuture<Box<Manifest + Sync>, Error> {
self.repo
.get_manifest_by_nodeid(nodeid)
.map(move |m| BoxManifest::new(m))
.boxify()
}
}
impl Repo for Box<Repo + Sync + Send> {
fn get_changesets(&self) -> BoxStream<NodeHash, Error> {
(**self).get_changesets()
}
fn get_heads(&self) -> BoxStream<NodeHash, Error> {
(**self).get_heads()
}
fn get_bookmark_keys(&self) -> BoxStream<Vec<u8>, Error> {
(**self).get_bookmark_keys()
}
fn get_bookmark_value(
&self,
key: &AsRef<[u8]>,
) -> BoxFuture<Option<(NodeHash, Version)>, Error> {
(**self).get_bookmark_value(key)
}
fn changeset_exists(&self, nodeid: &NodeHash) -> BoxFuture<bool, Error> {
(**self).changeset_exists(nodeid)
}
fn get_changeset_by_nodeid(&self, nodeid: &NodeHash) -> BoxFuture<Box<Changeset>, Error> {
(**self).get_changeset_by_nodeid(nodeid)
}
fn get_manifest_by_nodeid(&self, nodeid: &NodeHash) -> BoxFuture<Box<Manifest + Sync>, Error> {
(**self).get_manifest_by_nodeid(nodeid)
}
}
impl<R> Repo for Box<R>
where
R: Repo,
{
fn get_changesets(&self) -> BoxStream<NodeHash, Error> {
(**self).get_changesets()
}
fn get_heads(&self) -> BoxStream<NodeHash, Error> {
(**self).get_heads()
}
fn get_bookmark_keys(&self) -> BoxStream<Vec<u8>, Error> {
(**self).get_bookmark_keys()
}
fn get_bookmark_value(
&self,
key: &AsRef<[u8]>,
) -> BoxFuture<Option<(NodeHash, Version)>, Error> {
(**self).get_bookmark_value(key)
}
fn changeset_exists(&self, nodeid: &NodeHash) -> BoxFuture<bool, Error> {
(**self).changeset_exists(nodeid)
}
fn get_changeset_by_nodeid(&self, nodeid: &NodeHash) -> BoxFuture<Box<Changeset>, Error> {
(**self).get_changeset_by_nodeid(nodeid)
}
fn get_manifest_by_nodeid(&self, nodeid: &NodeHash) -> BoxFuture<Box<Manifest + Sync>, Error> {
(**self).get_manifest_by_nodeid(nodeid)
}
}
impl Repo for Arc<Repo> {
fn get_changesets(&self) -> BoxStream<NodeHash, Error> {
(**self).get_changesets()
}
fn get_heads(&self) -> BoxStream<NodeHash, Error> {
(**self).get_heads()
}
fn get_bookmark_keys(&self) -> BoxStream<Vec<u8>, Error> {
(**self).get_bookmark_keys()
}
fn get_bookmark_value(
&self,
key: &AsRef<[u8]>,
) -> BoxFuture<Option<(NodeHash, Version)>, Error> {
(**self).get_bookmark_value(key)
}
fn changeset_exists(&self, nodeid: &NodeHash) -> BoxFuture<bool, Error> {
(**self).changeset_exists(nodeid)
}
fn get_changeset_by_nodeid(&self, nodeid: &NodeHash) -> BoxFuture<Box<Changeset>, Error> {
(**self).get_changeset_by_nodeid(nodeid)
}
fn get_manifest_by_nodeid(&self, nodeid: &NodeHash) -> BoxFuture<Box<Manifest + Sync>, Error> {
(**self).get_manifest_by_nodeid(nodeid)
}
}
impl<R> Repo for Arc<R>
where
R: Repo,
{
fn get_changesets(&self) -> BoxStream<NodeHash, Error> {
(**self).get_changesets()
}
fn get_heads(&self) -> BoxStream<NodeHash, Error> {
(**self).get_heads()
}
fn get_bookmark_keys(&self) -> BoxStream<Vec<u8>, Error> {
(**self).get_bookmark_keys()
}
fn get_bookmark_value(
&self,
key: &AsRef<[u8]>,
) -> BoxFuture<Option<(NodeHash, Version)>, Error> {
(**self).get_bookmark_value(key)
}
fn changeset_exists(&self, nodeid: &NodeHash) -> BoxFuture<bool, Error> {
(**self).changeset_exists(nodeid)
}
fn get_changeset_by_nodeid(&self, nodeid: &NodeHash) -> BoxFuture<Box<Changeset>, Error> {
(**self).get_changeset_by_nodeid(nodeid)
}
fn get_manifest_by_nodeid(&self, nodeid: &NodeHash) -> BoxFuture<Box<Manifest + Sync>, Error> {
(**self).get_manifest_by_nodeid(nodeid)
}
}
#[cfg(test)]
mod test {
use super::*;
#[derive(Copy, Clone)]
struct DummyRepo;
impl Repo for DummyRepo {
fn get_changesets(&self) -> BoxStream<NodeHash, Error> {
unimplemented!("dummy impl")
}
fn get_heads(&self) -> BoxStream<NodeHash, Error> {
unimplemented!("dummy impl")
}
fn get_bookmark_keys(&self) -> BoxStream<Vec<u8>, Error> {
unimplemented!("dummy impl")
}
fn get_bookmark_value(
&self,
_key: &AsRef<[u8]>,
) -> BoxFuture<Option<(NodeHash, Version)>, Error> {
unimplemented!("dummy impl")
}
fn changeset_exists(&self, _nodeid: &NodeHash) -> BoxFuture<bool, Error> {
unimplemented!("dummy impl")
}
fn get_changeset_by_nodeid(&self, _nodeid: &NodeHash) -> BoxFuture<Box<Changeset>, Error> {
unimplemented!("dummy impl")
}
fn get_manifest_by_nodeid(
&self,
_nodeid: &NodeHash,
) -> BoxFuture<Box<Manifest + Sync>, Error> {
unimplemented!("dummy impl")
}
}
#[test]
fn test_impl() {
fn _assert_repo<T: Repo>(_: &T) {}
let repo = DummyRepo;
let a = Arc::new(repo);
let b = Box::new(repo);
_assert_repo(&repo);
_assert_repo(&a);
_assert_repo(&(a as Arc<Repo>));
_assert_repo(&b);
_assert_repo(&(b as Box<Repo + Sync + Send>));
}
}

View File

@ -7,4 +7,4 @@
pub mod revlog;
pub use self::revlog::{Details, RevlogManifest};
pub use mercurial_types::{Manifest, Repo};
pub use mercurial_types::Manifest;

View File

@ -20,8 +20,8 @@ use futures_ext::{BoxFuture, BoxStream, FutureExt, StreamExt};
use asyncmemo::{Asyncmemo, Filler};
use bookmarks::Bookmarks;
use mercurial_types::{fncache_fsencode, simple_fsencode, BlobNode, Changeset, MPath, MPathElement,
Manifest, NodeHash, Repo, RepoPath, NULL_HASH};
use mercurial_types::{fncache_fsencode, simple_fsencode, BlobNode, MPath, MPathElement, NodeHash,
RepoPath, NULL_HASH};
use stockbookmarks::StockBookmarks;
use storage_types::Version;
@ -125,7 +125,6 @@ pub struct RevlogRepo {
// memory in total: half for filelogs and half for revlogs.
}
pub struct RevlogRepoOptions {
pub inmemory_logs_capacity: usize,
}
@ -384,6 +383,16 @@ impl RevlogRepo {
Ok(StockBookmarks::read(self.basepath.clone())?)
}
pub fn get_bookmark_value(
&self,
key: &AsRef<[u8]>,
) -> BoxFuture<Option<(NodeHash, Version)>, Error> {
match self.bookmarks() {
Ok(b) => b.get(key).boxify(),
Err(e) => future::err(e).boxify(),
}
}
pub fn changesets(&self) -> ChangesetStream {
ChangesetStream::new(&self.changelog)
}
@ -446,46 +455,3 @@ impl Stream for ChangesetStream {
}
}
}
impl Repo for RevlogRepo {
fn get_heads(&self) -> BoxStream<NodeHash, Error> {
self.get_heads().boxify()
}
fn get_bookmark_keys(&self) -> BoxStream<Vec<u8>, Error> {
match self.bookmarks() {
Ok(bms) => bms.keys().from_err().boxify(),
Err(e) => stream::once(Err(e.into())).boxify(),
}
}
fn get_bookmark_value(
&self,
key: &AsRef<[u8]>,
) -> BoxFuture<Option<(NodeHash, Version)>, Error> {
match self.bookmarks() {
Ok(bms) => bms.get(key).from_err().boxify(),
Err(e) => future::err(e.into()).boxify(),
}
}
fn get_changesets(&self) -> BoxStream<NodeHash, Error> {
self.changesets().boxify()
}
fn changeset_exists(&self, nodeid: &NodeHash) -> BoxFuture<bool, Error> {
RevlogRepo::changeset_exists(self, nodeid).boxify()
}
fn get_changeset_by_nodeid(&self, nodeid: &NodeHash) -> BoxFuture<Box<Changeset>, Error> {
RevlogRepo::get_changeset_by_nodeid(self, nodeid)
.map(|cs| cs.boxed())
.boxify()
}
fn get_manifest_by_nodeid(&self, nodeid: &NodeHash) -> BoxFuture<Box<Manifest + Sync>, Error> {
RevlogRepo::get_manifest_by_nodeid(self, nodeid)
.map(|m| m.boxed())
.boxify()
}
}

View File

@ -10,6 +10,7 @@
#![deny(warnings)]
#![feature(try_from)]
extern crate blobrepo;
#[macro_use]
extern crate failure_ext as failure;
extern crate futures;

View File

@ -14,7 +14,9 @@ use std::str::from_utf8;
use futures::{future, Future, IntoFuture};
use mercurial_types::{MPath, Manifest, NodeHash, Repo};
use blobrepo::BlobRepo;
use mercurial::RevlogRepo;
use mercurial_types::{Changeset, MPath, Manifest, NodeHash};
use mercurial_types::manifest::Content;
use mercurial_types::path::MPathElement;
use toml;
@ -60,7 +62,20 @@ pub struct RepoConfigs {
impl RepoConfigs {
/// Read the config repo and generate RepoConfigs based on it
pub fn read_config_repo(
repo: Box<Repo + Send>,
repo: BlobRepo,
changeset_hash: NodeHash,
) -> Box<Future<Item = Self, Error = Error> + Send> {
Box::new(
repo.get_changeset_by_nodeid(&changeset_hash)
.and_then(move |changeset| repo.get_manifest_by_nodeid(changeset.manifestid()))
.map_err(|err| err.context("failed to get manifest from changeset").into())
.and_then(|manifest| Self::read_manifest(&manifest)),
)
}
/// Read the config repo and generate RepoConfigs based on it
pub fn read_revlog_config_repo(
repo: RevlogRepo,
changeset_hash: NodeHash,
) -> Box<Future<Item = Self, Error = Error> + Send> {
Box::new(

View File

@ -10,7 +10,6 @@
//! changeset and memoized for efficiency.
use std::cmp;
use std::marker::PhantomData;
use std::sync::Arc;
use std::usize;
@ -19,7 +18,8 @@ use futures::future::{self, Future};
use futures::stream::{self, Stream};
use asyncmemo::{Asyncmemo, Filler};
use mercurial_types::{NodeHash, Repo};
use blobrepo::BlobRepo;
use mercurial_types::NodeHash;
use nodehashkey::Key;
@ -33,17 +33,11 @@ pub struct Generation(u64);
/// Cache of generation numbers
///
/// Allows generation numbers for a changeset to be computed lazily and cached.
pub struct RepoGenCache<R>
where
R: Repo,
{
cache: Asyncmemo<GenFiller<R>>,
pub struct RepoGenCache {
cache: Asyncmemo<GenFiller>,
}
impl<R> Clone for RepoGenCache<R>
where
R: Repo,
{
impl Clone for RepoGenCache {
fn clone(&self) -> Self {
Self {
cache: self.cache.clone(),
@ -51,10 +45,7 @@ where
}
}
impl<R> RepoGenCache<R>
where
R: Repo,
{
impl RepoGenCache {
/// Construct a new `RepoGenCache`, bounded to `sizelimit` bytes.
pub fn new(sizelimit: usize) -> Self {
RepoGenCache {
@ -65,30 +56,23 @@ where
/// Get a `Future` for a `Generation` number for a given changeset in a repo.
pub fn get(
&self,
repo: &Arc<R>,
repo: &Arc<BlobRepo>,
nodeid: NodeHash,
) -> impl Future<Item = Generation, Error = Error> + Send {
self.cache.get((repo, nodeid))
}
}
pub struct GenFiller<R> {
_phantom: PhantomData<R>,
}
pub struct GenFiller {}
impl<R> GenFiller<R> {
impl GenFiller {
fn new() -> Self {
GenFiller {
_phantom: PhantomData,
}
GenFiller {}
}
}
impl<R> Filler for GenFiller<R>
where
R: Repo,
{
type Key = Key<R>;
impl Filler for GenFiller {
type Key = Key<BlobRepo>;
type Value = Box<Future<Item = Generation, Error = Error> + Send>;
fn fill(&self, cache: &Asyncmemo<Self>, &Key(ref repo, ref nodeid): &Self::Key) -> Self::Value {

View File

@ -19,6 +19,7 @@ extern crate heapsize;
#[macro_use]
extern crate heapsize_derive;
extern crate blobrepo;
extern crate futures_ext;
extern crate mercurial_types;

View File

@ -16,27 +16,25 @@ use futures::{Async, Poll};
use futures::future::Future;
use futures::stream::{iter_ok, Stream};
use mercurial_types::{Changeset, NodeHash, Repo};
use blobrepo::BlobRepo;
use mercurial_types::{Changeset, NodeHash};
use repoinfo::{Generation, RepoGenCache};
use IntersectNodeStream;
use NodeStream;
use errors::*;
pub struct AncestorsNodeStream<R>
where
R: Repo,
{
repo: Arc<R>,
repo_generation: RepoGenCache<R>,
pub struct AncestorsNodeStream {
repo: Arc<BlobRepo>,
repo_generation: RepoGenCache,
next_generation: BTreeMap<Generation, HashSet<NodeHash>>,
pending_changesets: Box<Stream<Item = (NodeHash, Generation), Error = Error> + Send>,
drain: IntoIter<NodeHash>,
}
fn make_pending<R: Repo>(
repo: Arc<R>,
repo_generation: RepoGenCache<R>,
fn make_pending(
repo: Arc<BlobRepo>,
repo_generation: RepoGenCache,
hashes: IntoIter<NodeHash>,
) -> Box<Stream<Item = (NodeHash, Generation), Error = Error> + Send> {
let size = hashes.size_hint().0;
@ -62,11 +60,8 @@ fn make_pending<R: Repo>(
)
}
impl<R> AncestorsNodeStream<R>
where
R: Repo,
{
pub fn new(repo: &Arc<R>, repo_generation: RepoGenCache<R>, hash: NodeHash) -> Self {
impl AncestorsNodeStream {
pub fn new(repo: &Arc<BlobRepo>, repo_generation: RepoGenCache, hash: NodeHash) -> Self {
let node_set: HashSet<NodeHash> = hashset!{hash};
AncestorsNodeStream {
repo: repo.clone(),
@ -86,10 +81,7 @@ where
}
}
impl<R> Stream for AncestorsNodeStream<R>
where
R: Repo,
{
impl Stream for AncestorsNodeStream {
type Item = NodeHash;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
@ -132,22 +124,19 @@ where
current_generation.clone().into_iter(),
);
self.drain = current_generation.into_iter();
Ok(Async::Ready(Some(
self.drain
.next()
.expect("Cannot create a generation without at least one node hash"),
)))
Ok(Async::Ready(Some(self.drain.next().expect(
"Cannot create a generation without at least one node hash",
))))
}
}
pub fn common_ancestors<I, R>(
repo: &Arc<R>,
repo_generation: RepoGenCache<R>,
pub fn common_ancestors<I>(
repo: &Arc<BlobRepo>,
repo_generation: RepoGenCache,
nodes: I,
) -> Box<NodeStream>
where
I: IntoIterator<Item = NodeHash>,
R: Repo,
{
let nodes_iter = nodes.into_iter().map({
let repo_generation = repo_generation.clone();
@ -156,14 +145,13 @@ where
IntersectNodeStream::new(repo, repo_generation, nodes_iter).boxed()
}
pub fn greatest_common_ancestor<I, R>(
repo: &Arc<R>,
repo_generation: RepoGenCache<R>,
pub fn greatest_common_ancestor<I>(
repo: &Arc<BlobRepo>,
repo_generation: RepoGenCache,
nodes: I,
) -> Box<NodeStream>
where
I: IntoIterator<Item = NodeHash>,
R: Repo,
{
Box::new(common_ancestors(repo, repo_generation, nodes).take(1))
}

View File

@ -4,11 +4,11 @@
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use blobrepo::BlobRepo;
use futures::Async;
use futures::Poll;
use futures::stream::Stream;
use mercurial_types::{NodeHash, Repo};
use mercurial_types::NodeHash;
use repoinfo::{Generation, RepoGenCache};
use std::boxed::Box;
use std::collections::HashMap;
@ -29,10 +29,9 @@ pub struct IntersectNodeStream {
}
impl IntersectNodeStream {
pub fn new<I, R>(repo: &Arc<R>, repo_generation: RepoGenCache<R>, inputs: I) -> Self
pub fn new<I>(repo: &Arc<BlobRepo>, repo_generation: RepoGenCache, inputs: I) -> Self
where
I: IntoIterator<Item = Box<NodeStream>>,
R: Repo,
{
let hash_and_gen = inputs.into_iter().map({
move |i| {

View File

@ -5,6 +5,7 @@
// GNU General Public License version 2 or any later version.
extern crate asyncmemo;
extern crate blobrepo;
#[macro_use]
extern crate failure_ext as failure;
extern crate futures;
@ -47,8 +48,6 @@ pub use range::RangeNodeStream;
#[cfg(test)]
extern crate ascii;
#[cfg(test)]
extern crate blobrepo;
#[cfg(test)]
extern crate branch_even;
#[cfg(test)]
extern crate branch_uneven;

View File

@ -13,7 +13,8 @@ use rand::thread_rng;
use std::collections::HashSet;
use std::sync::Arc;
use mercurial_types::{NodeHash, Repo};
use blobrepo::BlobRepo;
use mercurial_types::NodeHash;
use repoinfo::RepoGenCache;
use branch_even;
@ -46,9 +47,8 @@ pub struct RevsetSpec {
}
impl RevsetSpec {
pub fn add_hashes<R, G>(&mut self, repo: &R, random: &mut G)
pub fn add_hashes<G>(&mut self, repo: &BlobRepo, random: &mut G)
where
R: Repo,
G: Rng,
{
let mut all_changesets_executor = spawn(repo.get_changesets());
@ -106,10 +106,7 @@ impl RevsetSpec {
output.pop().expect("No revisions").into_iter().collect()
}
pub fn as_revset<R>(&self, repo: Arc<R>, repo_generation: RepoGenCache<R>) -> Box<NodeStream>
where
R: Repo,
{
pub fn as_revset(&self, repo: Arc<BlobRepo>, repo_generation: RepoGenCache) -> Box<NodeStream> {
let mut output: Vec<Box<NodeStream>> = Vec::with_capacity(self.rp_entries.len());
for entry in self.rp_entries.iter() {
let next_node = ValidateNodeStream::new(
@ -212,10 +209,7 @@ impl Arbitrary for RevsetSpec {
// is a SetDifference by pure chance.
}
fn match_hashset_to_revset<R>(repo: Arc<R>, mut set: RevsetSpec) -> bool
where
R: Repo,
{
fn match_hashset_to_revset(repo: Arc<BlobRepo>, mut set: RevsetSpec) -> bool {
let repo_generation = RepoGenCache::new(10);
set.add_hashes(&*repo, &mut thread_rng());

View File

@ -14,7 +14,8 @@ use futures::{Async, Poll};
use futures::future::Future;
use futures::stream::{self, iter_ok, Stream};
use mercurial_types::{NodeHash, Repo};
use blobrepo::BlobRepo;
use mercurial_types::NodeHash;
use repoinfo::{Generation, RepoGenCache};
use NodeStream;
@ -32,12 +33,9 @@ struct ParentChild {
child: HashGen,
}
pub struct RangeNodeStream<R>
where
R: Repo,
{
repo: Arc<R>,
repo_generation: RepoGenCache<R>,
pub struct RangeNodeStream {
repo: Arc<BlobRepo>,
repo_generation: RepoGenCache,
start_node: NodeHash,
start_generation: Box<Stream<Item = Generation, Error = Error> + Send>,
children: HashMap<HashGen, HashSet<HashGen>>,
@ -47,9 +45,9 @@ where
drain: Option<IntoIter<NodeHash>>,
}
fn make_pending<R: Repo>(
repo: Arc<R>,
repo_generation: RepoGenCache<R>,
fn make_pending(
repo: Arc<BlobRepo>,
repo_generation: RepoGenCache,
child: HashGen,
) -> Box<Stream<Item = ParentChild, Error = Error> + Send> {
Box::new(
@ -58,34 +56,27 @@ fn make_pending<R: Repo>(
repo.get_changeset_by_nodeid(&child.hash)
.map(move |cs| (child, cs.parents().clone()))
.map_err(|err| err.context(ErrorKind::ParentsFetchFailed).into())
}.map(|(child, parents)| {
iter_ok::<_, Error>(iter::repeat(child).zip(parents.into_iter()))
})
}.map(|(child, parents)| iter_ok::<_, Error>(iter::repeat(child).zip(parents.into_iter())))
.flatten_stream()
.and_then(move |(child, parent_hash)| {
repo_generation
.get(&repo, parent_hash)
.map(move |gen_id| {
ParentChild {
child,
parent: HashGen {
hash: parent_hash,
generation: gen_id,
},
}
.map(move |gen_id| ParentChild {
child,
parent: HashGen {
hash: parent_hash,
generation: gen_id,
},
})
.map_err(|err| err.context(ErrorKind::GenerationFetchFailed).into())
}),
)
}
impl<R> RangeNodeStream<R>
where
R: Repo,
{
impl RangeNodeStream {
pub fn new(
repo: &Arc<R>,
repo_generation: RepoGenCache<R>,
repo: &Arc<BlobRepo>,
repo_generation: RepoGenCache,
start_node: NodeHash,
end_node: NodeHash,
) -> Self {
@ -164,10 +155,7 @@ where
}
}
impl<R> Stream for RangeNodeStream<R>
where
R: Repo,
{
impl Stream for RangeNodeStream {
type Item = NodeHash;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {

View File

@ -4,9 +4,10 @@
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use blobrepo::BlobRepo;
use futures::future::Future;
use futures::stream::Stream;
use mercurial_types::{NodeHash, Repo};
use mercurial_types::NodeHash;
use repoinfo::{Generation, RepoGenCache};
use std::boxed::Box;
use std::sync::Arc;
@ -18,14 +19,11 @@ use futures::{Async, Poll};
pub type InputStream = Box<Stream<Item = (NodeHash, Generation), Error = Error> + 'static + Send>;
pub fn add_generations<R>(
pub fn add_generations(
stream: Box<NodeStream>,
repo_generation: RepoGenCache<R>,
repo: Arc<R>,
) -> InputStream
where
R: Repo,
{
repo_generation: RepoGenCache,
repo: Arc<BlobRepo>,
) -> InputStream {
let stream = stream.and_then(move |node_hash| {
repo_generation
.get(&repo, node_hash)

View File

@ -4,9 +4,10 @@
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use blobrepo::BlobRepo;
use futures::{Async, Poll};
use futures::stream::Stream;
use mercurial_types::{NodeHash, Repo};
use mercurial_types::NodeHash;
use repoinfo::{Generation, RepoGenCache};
use std::boxed::Box;
use std::collections::HashSet;
@ -28,15 +29,12 @@ pub struct SetDifferenceNodeStream {
}
impl SetDifferenceNodeStream {
pub fn new<R>(
repo: &Arc<R>,
repo_generation: RepoGenCache<R>,
pub fn new(
repo: &Arc<BlobRepo>,
repo_generation: RepoGenCache,
keep_input: Box<NodeStream>,
remove_input: Box<NodeStream>,
) -> SetDifferenceNodeStream
where
R: Repo,
{
) -> SetDifferenceNodeStream {
SetDifferenceNodeStream {
keep_input: add_generations(keep_input, repo_generation.clone(), repo.clone()),
next_keep: Async::NotReady,

View File

@ -6,14 +6,14 @@
use std::boxed::Box;
use blobrepo::BlobRepo;
use failure::Error;
use futures::{Async, Poll};
use futures::future::Future;
use futures::stream::Stream;
use mercurial_types::{NodeHash, Repo};
use mercurial_types::NodeHash;
use NodeStream;
use errors::*;
pub struct SingleNodeHash {
nodehash: Option<NodeHash>,
@ -21,12 +21,8 @@ pub struct SingleNodeHash {
}
impl SingleNodeHash {
pub fn new<R: Repo>(nodehash: NodeHash, repo: &R) -> Self {
let exists = Box::new(
repo.changeset_exists(&nodehash)
.map_err(move |e| e.context(ErrorKind::RepoError(nodehash)))
.from_err(),
);
pub fn new(nodehash: NodeHash, repo: &BlobRepo) -> Self {
let exists = Box::new(repo.changeset_exists(&nodehash));
let nodehash = Some(nodehash);
SingleNodeHash { nodehash, exists }
}
@ -59,7 +55,6 @@ impl Stream for SingleNodeHash {
#[cfg(test)]
mod test {
use super::*;
use blobrepo::{BlobRepo, MemBlobState};
use linear;
use repoinfo::RepoGenCache;
use std::sync::Arc;
@ -74,7 +69,7 @@ mod test {
&repo,
);
let repo_generation: RepoGenCache<BlobRepo<MemBlobState>> = RepoGenCache::new(10);
let repo_generation = RepoGenCache::new(10);
assert_node_sequence(
repo_generation,
@ -91,7 +86,7 @@ mod test {
let repo = Arc::new(linear::getrepo());
let nodehash = string_to_nodehash("0000000000000000000000000000000000000000");
let nodestream = SingleNodeHash::new(nodehash, &repo).boxed();
let repo_generation: RepoGenCache<BlobRepo<MemBlobState>> = RepoGenCache::new(10);
let repo_generation = RepoGenCache::new(10);
assert_node_sequence(repo_generation, &repo, vec![].into_iter(), nodestream);
}

View File

@ -6,10 +6,10 @@
use NodeStream;
use ascii::AsciiString;
use blobrepo::BlobRepo;
use futures::Future;
use futures::executor::spawn;
use mercurial_types::NodeHash;
use mercurial_types::Repo;
use repoinfo::RepoGenCache;
use std::collections::HashSet;
use std::sync::Arc;
@ -22,14 +22,13 @@ pub fn string_to_nodehash(hash: &'static str) -> NodeHash {
/// Accounting for reordering within generations, ensure that a NodeStream gives the expected
/// NodeHashes for testing.
pub fn assert_node_sequence<I, R>(
repo_generation: RepoGenCache<R>,
repo: &Arc<R>,
pub fn assert_node_sequence<I>(
repo_generation: RepoGenCache,
repo: &Arc<BlobRepo>,
hashes: I,
stream: Box<NodeStream>,
) where
I: IntoIterator<Item = NodeHash>,
R: Repo,
{
let mut nodestream = spawn(stream);
let mut received_hashes = HashSet::new();

View File

@ -4,6 +4,12 @@
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
use blobrepo::BlobRepo;
use futures::Async;
use futures::Poll;
use futures::stream::Stream;
use mercurial_types::NodeHash;
use repoinfo::{Generation, RepoGenCache};
use std::boxed::Box;
use std::collections::HashSet;
use std::collections::hash_set::IntoIter;
@ -12,11 +18,6 @@ use std::mem::replace;
use std::sync::Arc;
use failure::Error;
use futures::Async;
use futures::Poll;
use futures::stream::Stream;
use mercurial_types::{NodeHash, Repo};
use repoinfo::{Generation, RepoGenCache};
use NodeStream;
use setcommon::*;
@ -29,10 +30,9 @@ pub struct UnionNodeStream {
}
impl UnionNodeStream {
pub fn new<I, R>(repo: &Arc<R>, repo_generation: RepoGenCache<R>, inputs: I) -> Self
pub fn new<I>(repo: &Arc<BlobRepo>, repo_generation: RepoGenCache, inputs: I) -> Self
where
I: IntoIterator<Item = Box<NodeStream>>,
R: Repo,
{
let hash_and_gen = inputs.into_iter().map({
move |i| {

View File

@ -7,10 +7,11 @@
use std::collections::HashSet;
use std::sync::Arc;
use blobrepo::BlobRepo;
use failure::Error;
use futures::{Async, Poll};
use futures::stream::Stream;
use mercurial_types::{NodeHash, Repo};
use mercurial_types::NodeHash;
use repoinfo::{Generation, RepoGenCache};
use NodeStream;
@ -27,14 +28,11 @@ pub struct ValidateNodeStream {
}
impl ValidateNodeStream {
pub fn new<R>(
pub fn new(
wrapped: Box<NodeStream>,
repo: &Arc<R>,
repo_generation: RepoGenCache<R>,
) -> ValidateNodeStream
where
R: Repo,
{
repo: &Arc<BlobRepo>,
repo_generation: RepoGenCache,
) -> ValidateNodeStream {
ValidateNodeStream {
wrapped: add_generations(wrapped, repo_generation, repo.clone()),
last_generation: None,

View File

@ -49,7 +49,7 @@ mod listener;
use std::io;
use std::panic;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc;
use std::thread::{self, JoinHandle};
@ -68,13 +68,13 @@ use slog_logview::LogViewDrain;
use bytes::Bytes;
use hgproto::{sshproto, HgProtoHandler};
use mercurial::RevlogRepo;
use metaconfig::RepoConfigs;
use metaconfig::repoconfig::RepoType;
use errors::*;
use listener::{ssh_server_mux, Stdio};
use repo::OpenableRepoType;
struct SenderBytesWrite {
chan: Wait<mpsc::Sender<Bytes>>,
@ -188,7 +188,10 @@ fn start_thrift_service<'a>(
}
fn get_config<'a>(logger: &Logger, matches: &ArgMatches<'a>) -> Result<RepoConfigs> {
let config_repo = RepoType::Revlog(matches.value_of("crpath").unwrap().into()).open()?;
// TODO: This needs to cope with blob repos, too
let mut crpath = PathBuf::from(matches.value_of("crpath").unwrap());
crpath.push(".hg");
let config_repo = RevlogRepo::open(crpath)?;
let node_hash = if let Some(bookmark) = matches.value_of("crbookmark") {
config_repo
@ -202,11 +205,10 @@ fn get_config<'a>(logger: &Logger, matches: &ArgMatches<'a>) -> Result<RepoConfi
info!(
logger,
"Config repository will be read from commit: {}",
node_hash
"Config repository will be read from commit: {}", node_hash
);
RepoConfigs::read_config_repo(config_repo, node_hash)
RepoConfigs::read_revlog_config_repo(config_repo, node_hash)
.from_err()
.wait()
}

View File

@ -20,14 +20,13 @@ use futures_ext::{BoxFuture, FutureExt, StreamExt};
use slog::Logger;
use async_compression::CompressorType;
use mercurial;
use mercurial_bundles::{parts, Bundle2EncodeBuilder};
use mercurial_types::{percent_encode, BoxRepo, Changeset, NodeHash, Parents, Repo, NULL_HASH};
use mercurial_types::{percent_encode, Changeset, NodeHash, Parents, NULL_HASH};
use metaconfig::repoconfig::RepoType;
use hgproto::{self, GetbundleArgs, HgCommandRes, HgCommands};
use blobrepo::{BlobRepo, FilesBlobState, RocksBlobState};
use blobrepo::BlobRepo;
use errors::*;
@ -51,20 +50,20 @@ pub fn init_repo(
Ok((sock, repo))
}
pub trait OpenableRepoType {
fn open(&self) -> Result<Box<Repo + Sync + Send>>;
fn open(&self) -> Result<BlobRepo>;
fn path(&self) -> &Path;
}
impl OpenableRepoType for RepoType {
fn open(&self) -> Result<Box<Repo + Sync + Send>> {
fn open(&self) -> Result<BlobRepo> {
use hgproto::ErrorKind;
use metaconfig::repoconfig::RepoType::*;
let ret = match *self {
Revlog(ref path) => BoxRepo::new(mercurial::RevlogRepo::open(path.join(".hg"))?),
BlobFiles(ref path) => BoxRepo::new(BlobRepo::new(FilesBlobState::new(&path)?)),
BlobRocks(ref path) => BoxRepo::new(BlobRepo::new(RocksBlobState::new(&path)?)),
Revlog(_) => Err(ErrorKind::CantServeRevlogRepo)?,
BlobFiles(ref path) => BlobRepo::new_files(&path)?,
BlobRocks(ref path) => BlobRepo::new_rocksdb(&path)?,
};
Ok(ret)
@ -81,8 +80,8 @@ impl OpenableRepoType for RepoType {
pub struct HgRepo {
path: String,
hgrepo: Arc<Box<Repo + Send + Sync>>,
repo_generation: RepoGenCache<Box<Repo + Send + Sync>>,
hgrepo: Arc<BlobRepo>,
repo_generation: RepoGenCache,
_logger: Logger,
}

View File

@ -57,13 +57,13 @@ use membookmarks::MemBookmarks;
use mercurial_types::NodeHash;
use memheads::MemHeads;
use memlinknodes::MemLinknodes;
use blobrepo::{BlobRepo, MemBlobState};
use blobrepo::BlobRepo;
use ascii::AsciiString;
use blobstore::Blobstore;
use heads::Heads;
use futures::future::Future;
pub fn getrepo() -> BlobRepo<MemBlobState> {
pub fn getrepo() -> BlobRepo {
let bookmarks: MemBookmarks = MemBookmarks::new();
let heads: MemHeads = MemHeads::new();
let blobs = Memblob::new();
@ -98,7 +98,7 @@ pub fn getrepo() -> BlobRepo<MemBlobState> {
)
rs.writelines(
"""
BlobRepo::new(MemBlobState::new(heads, bookmarks, blobs, linknodes))
BlobRepo::new_memblob(heads, bookmarks, blobs, linknodes)
}
"""
)

View File

@ -17,7 +17,6 @@ use futures::executor::spawn;
use mercurial_types::manifest::{Content, Type};
use mercurial_types::nodehash::NodeHash;
use mercurial_types::path::MPath;
use mercurial_types::repo::Repo;
#[test]
fn check_heads() {
@ -28,8 +27,8 @@ fn check_heads() {
assert!(
if let Some(Ok(hash)) = heads.wait_stream() {
hash ==
NodeHash::from_ascii_str(&AsciiString::from_ascii(
hash
== NodeHash::from_ascii_str(&AsciiString::from_ascii(
"a5ffa77602a066db7d5cfb9fb5823a0895717c5a",
).expect(
"Can't turn string to AsciiString",

View File

@ -7,7 +7,7 @@ setup configuration
$ mkdir repos
$ cat > repos/repo <<CONFIG
> path="$TESTTMP/repo"
> repotype="revlog"
> repotype="blob:files"
> CONFIG
$ hg add repos
adding repos/repo
@ -26,8 +26,8 @@ setup configuration
setup repo
$ hg init repo
$ cd repo
$ hg init repo-hg
$ cd repo-hg
$ touch a
$ hg add a
$ hg ci -ma
@ -40,15 +40,22 @@ setup repo
$ cd $TESTTMP
$ blobimport --blobstore files --linknodes repo-hg repo > /dev/null 2>&1
blobimport currently doesn't handle bookmarks, but server requires the directory.
$ mkdir -p repo/books
Need a place for the socket to live
$ mkdir -p repo/.hg
setup repo2
$ hg clone repo repo2
$ hg clone repo-hg repo2
updating to branch default
1 files updated, 0 files merged, 0 files removed, 0 files unresolved
$ cd repo2
$ hg pull ../repo
pulling from ../repo
$ hg pull ../repo-hg
pulling from ../repo-hg
searching for changes
no changes found
@ -65,15 +72,17 @@ start mononoke
no changes found
Create a new bookmark and try and send it over the wire
$ cd ../repo
$ hg bookmark test-bookmark
$ hg bookmarks
* test-bookmark 0:3903775176ed
$ cd ../repo2
$ hgmn pull ssh://user@dummy/repo
pulling from ssh://user@dummy/repo
searching for changes
no changes found
adding remote bookmark test-bookmark
$ hg bookmarks
test-bookmark 0:3903775176ed
Test commented while we have no bookmark support in blobimport or easy method
to create a fileblob bookmark
# $ cd ../repo
# $ hg bookmark test-bookmark
# $ hg bookmarks
# * test-bookmark 0:3903775176ed
# $ cd ../repo2
# $ hgmn pull ssh://user@dummy/repo
# pulling from ssh://user@dummy/repo
# searching for changes
# no changes found
# adding remote bookmark test-bookmark
# $ hg bookmarks
# test-bookmark 0:3903775176ed

View File

@ -7,7 +7,7 @@ setup configuration
$ mkdir repos
$ cat > repos/repo <<CONFIG
> path="$TESTTMP/repo"
> repotype="revlog"
> repotype="blob:files"
> CONFIG
$ hg add repos
adding repos/repo
@ -26,8 +26,8 @@ setup configuration
setup repo
$ hg init repo
$ cd repo
$ hg init repo-hg
$ cd repo-hg
$ touch a
$ hg add a
$ hg ci -ma
@ -40,15 +40,22 @@ setup repo
$ cd $TESTTMP
$ blobimport --blobstore files --linknodes repo-hg repo > /dev/null 2>&1
blobimport currently doesn't handle bookmarks, but server requires the directory.
$ mkdir -p repo/books
Need a place for the socket to live
$ mkdir -p repo/.hg
setup repo2
$ hg clone repo repo2
$ hg clone repo-hg repo2
updating to branch default
1 files updated, 0 files merged, 0 files removed, 0 files unresolved
$ cd repo2
$ hg pull ../repo
pulling from ../repo
$ hg pull ../repo-hg
pulling from ../repo-hg
searching for changes
no changes found