mononoke: revert D8959535

Summary:
It makes startup unbearably slow, and doesn't add any benefits at all. Revert
it

Reviewed By: purplefox

Differential Revision: D9358741

fbshipit-source-id: 26469941304f737c856a6ffca5e577848ad30955
This commit is contained in:
Stanislau Hlebik 2018-08-16 03:03:01 -07:00 committed by Facebook Github Bot
parent b4ebd9f8ac
commit 2c8d98447d
4 changed files with 31 additions and 28 deletions

View File

@ -259,7 +259,7 @@ macro_rules! impl_bonsai_hg_mapping {
} else {
Ok(result)
}
}).boxify()
})
}
fn add(&self, entry: BonsaiHgMappingEntry) -> BoxFuture<bool, Error> {
@ -292,7 +292,7 @@ macro_rules! impl_bonsai_hg_mapping {
match entry_by_bcs.or(entry_by_hgcs) {
Some(ref stored_entry) if stored_entry == &entry => Ok(false),
Some(stored_entry) => {
Err(ErrorKind::ConflictingEntries(stored_entry.clone(), entry.clone())
Err(ErrorKind::ConflictingEntries(stored_entry.clone(), entry)
.into())
}
_ => Err(err.into()),
@ -300,7 +300,7 @@ macro_rules! impl_bonsai_hg_mapping {
}
Err(err) => Err(err.into()),
}
}).boxify()
})
}
}

View File

@ -45,7 +45,7 @@ use diesel::result::{DatabaseErrorKind, Error as DieselError};
use diesel::sql_types::HasSqlType;
use failure::ResultExt;
use futures_ext::{asynchronize, BoxFuture, FutureExt};
use futures_ext::{asynchronize, BoxFuture};
use mercurial_types::RepositoryId;
use mononoke_types::ChangesetId;
use mononoke_types::sql_types::ChangesetIdSql;
@ -226,7 +226,7 @@ macro_rules! impl_changesets {
} else {
Ok(changeset)
}
}).boxify()
})
}
/// Insert a new changeset into this table. Checks that all parents are already in
@ -346,7 +346,7 @@ macro_rules! impl_changesets {
Ok(true)
})
})
}).boxify()
})
}
}

View File

@ -35,7 +35,7 @@ use diesel::sql_types::HasSqlType;
use failure::{Error, Result, ResultExt};
use filenodes::{FilenodeInfo, Filenodes, blake2_path_hash};
use futures::{Future, Stream};
use futures_ext::{asynchronize, BoxFuture, BoxStream, FutureExt};
use futures_ext::{asynchronize, BoxFuture, BoxStream};
use mercurial_types::{HgFileNodeId, RepoPath, RepositoryId};
use mercurial_types::sql_types::HgFileNodeIdSql;
use stats::Timeseries;
@ -167,20 +167,15 @@ macro_rules! impl_filenodes {
let db = self.clone();
let insert_chunk_size = self.insert_chunk_size;
filenodes.chunks(insert_chunk_size)
.and_then(move |filenodes| {
asynchronize(move || {
filenodes.chunks(insert_chunk_size).and_then(move |filenodes| {
STATS::adds.add_value(filenodes.len() as i64);
asynchronize({
let db = db.clone();
move || {
let connection = db.get_master_conn()?;
Self::do_insert(&connection, &filenodes, &repo_id)
}
})
let connection = db.get_master_conn()?;
Self::do_insert(&connection, &filenodes, &repo_id)
})
.for_each(|()| Ok(()))
.from_err()
.boxify()
})
}
fn get_filenode(
@ -207,7 +202,7 @@ macro_rules! impl_filenodes {
} else {
Ok(filenode_info)
}
}).boxify()
})
}
fn get_all_filenodes(
@ -233,7 +228,7 @@ macro_rules! impl_filenodes {
}
Ok(res)
}).boxify()
})
}
}

View File

@ -21,7 +21,6 @@ extern crate futures;
extern crate quickcheck;
extern crate tokio;
extern crate tokio_io;
extern crate tokio_threadpool;
use bytes::Bytes;
use futures::{future, Async, AsyncSink, Future, IntoFuture, Poll, Sink, Stream};
@ -29,7 +28,6 @@ use futures::sync::{mpsc, oneshot};
use tokio::timer::{Deadline, DeadlineError};
use tokio_io::AsyncWrite;
use tokio_io::codec::{Decoder, Encoder};
use tokio_threadpool::{blocking, BlockingError};
use std::{io as std_io, fmt::Debug, time::{Duration, Instant}};
@ -551,15 +549,25 @@ macro_rules! ensure_boxstream {
/// Ok(())
/// })
/// ```
pub fn asynchronize<Func, T, E>(func: Func) -> impl Future<Item = T, Error = E>
pub fn asynchronize<Func, T, E, R>(f: Func) -> BoxFuture<T, E>
where
Func: FnMut() -> Result<T, E> + Clone,
E: From<BlockingError>,
Func: FnOnce() -> R + Send + 'static,
E: From<futures::Canceled> + Send + 'static,
R: IntoFuture<Item = T, Error = E> + 'static,
T: Send + 'static,
<R as IntoFuture>::Future: Send,
{
let mut func = func;
future::poll_fn(move || blocking(&mut func))
.map_err(E::from)
.and_then(|res| res) // flatten Ok(res) => res
let (tx, rx) = oneshot::channel();
let fut = future::lazy(f).then(|res| {
let _ = tx.send(res);
Ok(())
});
future::lazy(move || {
let _ = tokio::spawn(fut);
rx.from_err().and_then(|v| v)
}).boxify()
}
/// Simple adapter from `Sink` interface to `AsyncWrite` interface.