mirror of
https://github.com/facebook/sapling.git
synced 2024-10-10 16:57:49 +03:00
blobimport: use (effectively) named params for conversion
Summary: Going to add more params here, and this is becoming quite hard to read. Reviewed By: StanislavGlebik Differential Revision: D6096419 fbshipit-source-id: 50f0b99bb6b1804fc01f6a99fc0297c1695dbaa5
This commit is contained in:
parent
a1c6c494fd
commit
92eb9c6e6a
@ -24,59 +24,67 @@ use STATS;
|
||||
use errors::*;
|
||||
use manifest;
|
||||
|
||||
pub(crate) fn convert<H>(
|
||||
revlog: RevlogRepo,
|
||||
sender: SyncSender<BlobstoreEntry>,
|
||||
headstore: H,
|
||||
core: Core,
|
||||
cpupool: Arc<CpuPool>,
|
||||
logger: &Logger,
|
||||
) -> Result<()>
|
||||
pub(crate) struct ConvertContext<H> {
|
||||
pub repo: RevlogRepo,
|
||||
pub sender: SyncSender<BlobstoreEntry>,
|
||||
pub headstore: H,
|
||||
pub core: Core,
|
||||
pub cpupool: Arc<CpuPool>,
|
||||
pub logger: Logger,
|
||||
}
|
||||
|
||||
impl<H> ConvertContext<H>
|
||||
where
|
||||
H: Heads<Key = String>,
|
||||
H::Error: Into<Error>,
|
||||
{
|
||||
let mut core = core;
|
||||
pub fn convert(self) -> Result<()> {
|
||||
let mut core = self.core;
|
||||
let logger_owned = self.logger;
|
||||
let logger = &logger_owned;
|
||||
let cpupool = self.cpupool;
|
||||
let headstore = self.headstore;
|
||||
|
||||
// Generate stream of changesets. For each changeset, save the cs blob, and the manifest blob,
|
||||
// and the files.
|
||||
let changesets = revlog.changesets()
|
||||
.map_err(Error::from)
|
||||
.enumerate()
|
||||
.map({
|
||||
let revlog = revlog.clone();
|
||||
let sender = sender.clone();
|
||||
move |(seq, csid)| {
|
||||
debug!(logger, "{}: changeset {}", seq, csid);
|
||||
STATS::changesets.add_value(1);
|
||||
copy_changeset(revlog.clone(), sender.clone(), csid)
|
||||
}
|
||||
}) // Stream<Future<()>>
|
||||
.map(|copy| cpupool.spawn(copy))
|
||||
.buffer_unordered(100);
|
||||
// Generate stream of changesets. For each changeset, save the cs blob, and the manifest
|
||||
// blob, and the files.
|
||||
let changesets = self.repo.changesets()
|
||||
.map_err(Error::from)
|
||||
.enumerate()
|
||||
.map({
|
||||
let repo = self.repo.clone();
|
||||
let sender = self.sender.clone();
|
||||
move |(seq, csid)| {
|
||||
debug!(logger, "{}: changeset {}", seq, csid);
|
||||
STATS::changesets.add_value(1);
|
||||
copy_changeset(repo.clone(), sender.clone(), csid)
|
||||
}
|
||||
}) // Stream<Future<()>>
|
||||
.map(|copy| cpupool.spawn(copy))
|
||||
.buffer_unordered(100);
|
||||
|
||||
let heads = revlog
|
||||
.get_heads()
|
||||
.map_err(Error::from)
|
||||
.map_err(|err| Error::with_chain(err, "Failed get heads"))
|
||||
.map(|h| {
|
||||
debug!(logger, "head {}", h);
|
||||
STATS::heads.add_value(1);
|
||||
headstore
|
||||
.add(&format!("{}", h))
|
||||
.map_err(Into::into)
|
||||
.map_err({
|
||||
move |err| Error::with_chain(err, format!("Failed to create head {}", h))
|
||||
})
|
||||
})
|
||||
.buffer_unordered(100);
|
||||
let heads = self.repo
|
||||
.get_heads()
|
||||
.map_err(Error::from)
|
||||
.map_err(|err| Error::with_chain(err, "Failed get heads"))
|
||||
.map(|h| {
|
||||
debug!(logger, "head {}", h);
|
||||
STATS::heads.add_value(1);
|
||||
headstore
|
||||
.add(&format!("{}", h))
|
||||
.map_err(Into::into)
|
||||
.map_err({
|
||||
move |err| Error::with_chain(err, format!("Failed to create head {}", h))
|
||||
})
|
||||
})
|
||||
.buffer_unordered(100);
|
||||
|
||||
let convert = changesets.select(heads).for_each(|_| Ok(()));
|
||||
let convert = changesets.select(heads).for_each(|_| Ok(()));
|
||||
|
||||
core.run(convert)?;
|
||||
core.run(convert)?;
|
||||
|
||||
info!(logger, "parsed everything, waiting for io");
|
||||
Ok(())
|
||||
info!(logger, "parsed everything, waiting for io");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Copy a changeset and its manifest into the blobstore
|
||||
|
@ -162,7 +162,15 @@ where
|
||||
let repo = open_repo(&input)?;
|
||||
|
||||
info!(logger, "Converting: {:?}", input);
|
||||
let res = convert::convert(repo, sender, headstore, core, cpupool, logger);
|
||||
let convert_context = convert::ConvertContext {
|
||||
repo,
|
||||
sender,
|
||||
headstore,
|
||||
core,
|
||||
cpupool,
|
||||
logger: logger.clone(),
|
||||
};
|
||||
let res = convert_context.convert();
|
||||
iothread.join().expect("failed to join io thread")?;
|
||||
res
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user