2019-10-11 23:51:17 +03:00
|
|
|
/*
|
|
|
|
* Copyright (c) Facebook, Inc. and its affiliates.
|
|
|
|
*
|
|
|
|
* This software may be used and distributed according to the terms of the
|
2020-02-11 13:42:43 +03:00
|
|
|
* GNU General Public License version 2.
|
2019-10-11 23:51:17 +03:00
|
|
|
*/
|
2018-08-08 15:24:56 +03:00
|
|
|
|
|
|
|
#![deny(warnings)]
|
|
|
|
|
2020-04-16 12:12:29 +03:00
|
|
|
use anyhow::{Error, Result};
|
2018-08-08 15:24:56 +03:00
|
|
|
use blobrepo::BlobRepo;
|
2020-04-08 18:56:17 +03:00
|
|
|
use blobstore::Loadable;
|
2019-05-20 17:54:53 +03:00
|
|
|
use bookmarks::BookmarkName;
|
2019-04-08 12:49:27 +03:00
|
|
|
use cloned::cloned;
|
2018-11-30 21:11:37 +03:00
|
|
|
use context::CoreContext;
|
2020-04-11 14:25:18 +03:00
|
|
|
use futures::{FutureExt, TryFutureExt};
|
2020-03-18 19:13:05 +03:00
|
|
|
use futures_ext::{spawn_future, BoxFuture, FutureExt as OldFutureExt};
|
2020-03-03 08:00:03 +03:00
|
|
|
use futures_old::{Future, Stream};
|
2020-03-18 19:13:05 +03:00
|
|
|
use hooks::{hook_loader::load_hooks, HookManager, HookOutcome};
|
2020-04-11 14:25:18 +03:00
|
|
|
use hooks_content_stores::blobrepo_text_only_fetcher;
|
2018-08-08 15:24:56 +03:00
|
|
|
use manifold::{ManifoldHttpClient, PayloadRange};
|
2019-03-15 00:16:15 +03:00
|
|
|
use mercurial_types::HgChangesetId;
|
2019-01-31 11:31:11 +03:00
|
|
|
use metaconfig_types::RepoConfig;
|
2018-09-26 14:46:41 +03:00
|
|
|
use mononoke_types::ChangesetId;
|
2018-08-08 15:24:56 +03:00
|
|
|
use revset::AncestorsNodeStream;
|
2019-11-15 18:04:51 +03:00
|
|
|
use scuba_ext::ScubaSampleBuilder;
|
2019-10-16 21:02:19 +03:00
|
|
|
use slog::{debug, info};
|
2019-04-17 17:56:57 +03:00
|
|
|
use std::collections::HashSet;
|
2018-08-08 15:24:56 +03:00
|
|
|
use std::sync::Arc;
|
2019-11-15 09:02:57 +03:00
|
|
|
use thiserror::Error;
|
2018-08-08 15:24:56 +03:00
|
|
|
|
|
|
|
pub struct Tailer {
|
2018-11-30 21:11:37 +03:00
|
|
|
ctx: CoreContext,
|
2019-01-09 14:49:56 +03:00
|
|
|
repo: BlobRepo,
|
2018-08-08 15:24:56 +03:00
|
|
|
hook_manager: Arc<HookManager>,
|
2019-05-20 17:54:53 +03:00
|
|
|
bookmark: BookmarkName,
|
2018-08-08 15:24:56 +03:00
|
|
|
last_rev_key: String,
|
|
|
|
manifold_client: ManifoldHttpClient,
|
2019-04-17 17:56:57 +03:00
|
|
|
excludes: HashSet<ChangesetId>,
|
2018-08-08 15:24:56 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Tailer {
|
|
|
|
pub fn new(
|
2018-11-30 21:11:37 +03:00
|
|
|
ctx: CoreContext,
|
2019-01-09 14:49:56 +03:00
|
|
|
repo: BlobRepo,
|
2018-08-08 15:24:56 +03:00
|
|
|
config: RepoConfig,
|
2019-05-20 17:54:53 +03:00
|
|
|
bookmark: BookmarkName,
|
2018-08-08 15:24:56 +03:00
|
|
|
manifold_client: ManifoldHttpClient,
|
2019-04-17 17:56:57 +03:00
|
|
|
excludes: HashSet<ChangesetId>,
|
2019-07-11 16:38:11 +03:00
|
|
|
disabled_hooks: &HashSet<String>,
|
2018-08-08 15:24:56 +03:00
|
|
|
) -> Result<Tailer> {
|
2020-04-08 18:56:17 +03:00
|
|
|
let content_fetcher = blobrepo_text_only_fetcher(repo.clone(), config.hook_max_file_size);
|
2018-08-08 15:24:56 +03:00
|
|
|
|
|
|
|
let mut hook_manager = HookManager::new(
|
2020-01-27 15:28:33 +03:00
|
|
|
ctx.fb,
|
2020-04-08 18:56:17 +03:00
|
|
|
content_fetcher,
|
2018-11-22 14:28:28 +03:00
|
|
|
Default::default(),
|
2019-11-15 18:04:51 +03:00
|
|
|
ScubaSampleBuilder::with_discard(),
|
2018-08-08 15:24:56 +03:00
|
|
|
);
|
|
|
|
|
2019-09-14 06:16:08 +03:00
|
|
|
load_hooks(ctx.fb, &mut hook_manager, config, disabled_hooks)?;
|
2018-08-08 15:24:56 +03:00
|
|
|
|
|
|
|
let repo_id = repo.get_repoid().id();
|
|
|
|
let last_rev_key = format!("{}{}", "__mononoke_hook_tailer_last_rev.", repo_id).to_string();
|
|
|
|
|
|
|
|
Ok(Tailer {
|
2018-11-30 21:11:37 +03:00
|
|
|
ctx,
|
2018-08-08 15:24:56 +03:00
|
|
|
repo,
|
|
|
|
hook_manager: Arc::new(hook_manager),
|
|
|
|
bookmark,
|
|
|
|
last_rev_key,
|
|
|
|
manifold_client,
|
2019-04-17 17:56:57 +03:00
|
|
|
excludes,
|
2018-08-08 15:24:56 +03:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn get_last_rev_key(&self) -> String {
|
|
|
|
self.last_rev_key.clone()
|
|
|
|
}
|
|
|
|
|
2019-04-09 18:36:33 +03:00
|
|
|
pub fn run_single_changeset(
|
|
|
|
&self,
|
2020-04-16 12:12:29 +03:00
|
|
|
changeset: ChangesetId,
|
2020-03-18 19:13:05 +03:00
|
|
|
) -> BoxFuture<Vec<HookOutcome>, Error> {
|
2019-10-16 21:02:19 +03:00
|
|
|
cloned!(self.ctx, self.repo, self.hook_manager, self.bookmark,);
|
2020-04-16 12:12:29 +03:00
|
|
|
run_hooks_for_changeset(ctx, repo, hook_manager, bookmark, changeset)
|
2020-03-18 19:13:05 +03:00
|
|
|
.map(|(_, result)| result)
|
2019-04-09 18:36:33 +03:00
|
|
|
.boxify()
|
|
|
|
}
|
|
|
|
|
2020-03-18 19:13:05 +03:00
|
|
|
pub fn run_with_limit(&self, limit: u64) -> BoxFuture<Vec<HookOutcome>, Error> {
|
2018-11-30 21:11:37 +03:00
|
|
|
let ctx = self.ctx.clone();
|
2018-10-22 15:20:40 +03:00
|
|
|
let bm = self.bookmark.clone();
|
|
|
|
let hm = self.hook_manager.clone();
|
2019-04-17 17:56:57 +03:00
|
|
|
let excludes = self.excludes.clone();
|
2018-10-22 15:20:40 +03:00
|
|
|
|
2020-04-16 12:12:29 +03:00
|
|
|
let bm_rev = self.repo.get_bonsai_bookmark(ctx.clone(), &bm).and_then({
|
|
|
|
cloned!(bm);
|
|
|
|
|opt| opt.ok_or(ErrorKind::NoSuchBookmark(bm).into())
|
|
|
|
});
|
2018-10-22 15:20:40 +03:00
|
|
|
|
2019-10-16 21:02:19 +03:00
|
|
|
cloned!(self.ctx, self.repo);
|
2018-10-22 15:20:40 +03:00
|
|
|
bm_rev
|
|
|
|
.and_then(move |bm_rev| {
|
2018-11-30 21:11:37 +03:00
|
|
|
AncestorsNodeStream::new(ctx.clone(), &repo.get_changeset_fetcher(), bm_rev)
|
2018-10-22 15:20:40 +03:00
|
|
|
.take(limit)
|
2019-04-17 17:56:57 +03:00
|
|
|
.filter(move |cs| !excludes.contains(cs))
|
2018-10-22 15:20:40 +03:00
|
|
|
.map({
|
|
|
|
move |cs| {
|
2019-10-16 21:02:19 +03:00
|
|
|
cloned!(ctx, bm, hm, repo);
|
|
|
|
run_hooks_for_changeset(ctx, repo, hm, bm, cs)
|
2018-10-22 15:20:40 +03:00
|
|
|
}
|
|
|
|
})
|
|
|
|
.map(spawn_future)
|
|
|
|
.buffered(100)
|
|
|
|
.map(|(_, res)| res)
|
2020-03-18 19:13:05 +03:00
|
|
|
.concat2()
|
2018-10-22 15:20:40 +03:00
|
|
|
})
|
|
|
|
.boxify()
|
|
|
|
}
|
|
|
|
|
2020-03-18 19:13:05 +03:00
|
|
|
pub fn run(&self) -> BoxFuture<Vec<HookOutcome>, Error> {
|
2019-04-17 17:56:57 +03:00
|
|
|
info!(
|
2019-10-16 21:02:19 +03:00
|
|
|
self.ctx.logger(),
|
2019-04-17 17:56:57 +03:00
|
|
|
"Running tailer on bookmark {}",
|
|
|
|
self.bookmark.clone()
|
|
|
|
);
|
2018-08-08 15:24:56 +03:00
|
|
|
|
|
|
|
self.repo
|
2020-04-08 18:56:17 +03:00
|
|
|
.get_bonsai_bookmark(self.ctx.clone(), &self.bookmark.clone())
|
2019-04-17 17:56:57 +03:00
|
|
|
.and_then({
|
|
|
|
cloned!(self.bookmark);
|
|
|
|
|opt| opt.ok_or(ErrorKind::NoSuchBookmark(bookmark).into())
|
|
|
|
})
|
|
|
|
.and_then({
|
|
|
|
cloned!(self.last_rev_key, self.manifold_client);
|
|
|
|
move |current_bm_cs| {
|
|
|
|
manifold_client
|
|
|
|
.read(last_rev_key, PayloadRange::Full)
|
|
|
|
.map(move |opt| (current_bm_cs, opt))
|
|
|
|
}
|
2018-08-08 15:24:56 +03:00
|
|
|
})
|
|
|
|
.and_then(|(current_bm_cs, opt)| match opt {
|
|
|
|
Some(last_rev_bytes) => Ok((current_bm_cs, last_rev_bytes)),
|
|
|
|
None => Err(ErrorKind::NoLastRevision.into()),
|
|
|
|
})
|
|
|
|
.and_then(|(current_bm_cs, last_rev_bytes)| {
|
2020-04-08 18:56:17 +03:00
|
|
|
let node_hash = ChangesetId::from_bytes(&*last_rev_bytes.payload.payload)?;
|
2018-08-08 15:24:56 +03:00
|
|
|
Ok((current_bm_cs, node_hash))
|
|
|
|
})
|
2019-04-17 17:56:57 +03:00
|
|
|
.and_then({
|
|
|
|
cloned!(
|
|
|
|
self.bookmark,
|
|
|
|
self.excludes,
|
|
|
|
self.hook_manager,
|
|
|
|
self.repo,
|
|
|
|
self.ctx
|
2018-08-08 15:24:56 +03:00
|
|
|
);
|
2019-04-17 17:56:57 +03:00
|
|
|
move |(current_bm_cs, last_rev)| {
|
|
|
|
let end_rev = current_bm_cs;
|
|
|
|
info!(
|
2019-10-16 21:02:19 +03:00
|
|
|
ctx.logger(),
|
2019-04-17 17:56:57 +03:00
|
|
|
"Bookmark is currently at {}, last processed revision is {}",
|
|
|
|
end_rev,
|
|
|
|
last_rev
|
|
|
|
);
|
|
|
|
if last_rev == end_rev {
|
2019-10-16 21:02:19 +03:00
|
|
|
info!(ctx.logger(), "Nothing to do");
|
2019-04-17 17:56:57 +03:00
|
|
|
}
|
2020-04-16 12:12:29 +03:00
|
|
|
run_in_range0(
|
2019-04-17 17:56:57 +03:00
|
|
|
ctx,
|
|
|
|
repo,
|
|
|
|
hook_manager,
|
|
|
|
last_rev,
|
|
|
|
end_rev,
|
|
|
|
bookmark,
|
|
|
|
excludes,
|
|
|
|
)
|
2018-08-08 15:24:56 +03:00
|
|
|
.map(move |res| (end_rev, res))
|
2019-04-17 17:56:57 +03:00
|
|
|
}
|
2018-08-08 15:24:56 +03:00
|
|
|
})
|
2019-04-17 17:56:57 +03:00
|
|
|
.and_then({
|
2019-10-16 21:02:19 +03:00
|
|
|
cloned!(self.last_rev_key, self.ctx, self.manifold_client);
|
2019-04-17 17:56:57 +03:00
|
|
|
move |(end_rev, res)| {
|
2019-10-16 21:02:19 +03:00
|
|
|
info!(
|
|
|
|
ctx.logger(),
|
|
|
|
"Setting last processed revision to {:?}", end_rev
|
|
|
|
);
|
2020-04-08 18:56:17 +03:00
|
|
|
let bytes = end_rev.as_ref().into();
|
2019-04-17 17:56:57 +03:00
|
|
|
manifold_client.write(last_rev_key, bytes).map(|()| res)
|
|
|
|
}
|
2018-08-08 15:24:56 +03:00
|
|
|
})
|
|
|
|
.boxify()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-10-17 18:03:39 +03:00
|
|
|
fn run_hooks_for_changeset(
|
2018-11-30 21:11:37 +03:00
|
|
|
ctx: CoreContext,
|
2019-01-09 14:49:56 +03:00
|
|
|
repo: BlobRepo,
|
2018-10-17 18:03:39 +03:00
|
|
|
hm: Arc<HookManager>,
|
2019-05-20 17:54:53 +03:00
|
|
|
bm: BookmarkName,
|
2020-04-08 18:56:17 +03:00
|
|
|
cs_id: ChangesetId,
|
|
|
|
) -> impl Future<Item = (ChangesetId, Vec<HookOutcome>), Error = Error> {
|
|
|
|
cs_id
|
|
|
|
.load(ctx.clone(), repo.blobstore())
|
|
|
|
.from_err()
|
|
|
|
.and_then(move |cs| {
|
2020-03-18 19:13:05 +03:00
|
|
|
let ctx = ctx.clone();
|
|
|
|
let hm = hm.clone();
|
|
|
|
let bm = bm.clone();
|
|
|
|
async move {
|
2020-04-08 18:56:17 +03:00
|
|
|
debug!(ctx.logger(), "Running hooks for changeset {:?}", cs);
|
2020-04-11 14:25:18 +03:00
|
|
|
let hook_results = hm
|
|
|
|
.run_hooks_for_bookmark(&ctx, vec![cs].iter(), &bm, None)
|
2020-04-08 18:56:17 +03:00
|
|
|
.await?;
|
|
|
|
Ok((cs_id, hook_results))
|
2020-03-18 19:13:05 +03:00
|
|
|
}
|
|
|
|
.boxed()
|
|
|
|
.compat()
|
2018-10-17 18:03:39 +03:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2020-04-16 12:12:29 +03:00
|
|
|
fn run_in_range0(
|
|
|
|
ctx: CoreContext,
|
|
|
|
repo: BlobRepo,
|
|
|
|
hm: Arc<HookManager>,
|
|
|
|
last_rev: ChangesetId,
|
|
|
|
end_rev: ChangesetId,
|
|
|
|
bm: BookmarkName,
|
|
|
|
excludes: HashSet<ChangesetId>,
|
|
|
|
) -> BoxFuture<Vec<HookOutcome>, Error> {
|
|
|
|
debug!(ctx.logger(), "Running in range {} to {}", last_rev, end_rev);
|
|
|
|
AncestorsNodeStream::new(ctx.clone(), &repo.get_changeset_fetcher(), end_rev)
|
|
|
|
.take(1000) // Limit number so we don't process too many
|
|
|
|
.filter(move |cs| !excludes.contains(cs))
|
|
|
|
.map({
|
|
|
|
move |cs| {
|
|
|
|
cloned!(ctx, bm, hm, repo);
|
|
|
|
run_hooks_for_changeset(ctx, repo, hm, bm, cs)
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.map(spawn_future)
|
|
|
|
.buffered(100)
|
|
|
|
.take_while(move |(cs, _)| {
|
|
|
|
Ok(*cs != last_rev)
|
|
|
|
})
|
|
|
|
.map(|(_, res)| res)
|
|
|
|
.concat2()
|
|
|
|
.boxify()
|
|
|
|
}
|
|
|
|
|
2019-11-15 09:02:57 +03:00
|
|
|
#[derive(Debug, Error)]
|
2018-08-08 15:24:56 +03:00
|
|
|
pub enum ErrorKind {
|
2019-11-15 09:02:57 +03:00
|
|
|
#[error("No such bookmark '{0}'")]
|
2019-05-20 17:54:53 +03:00
|
|
|
NoSuchBookmark(BookmarkName),
|
2019-11-15 09:02:57 +03:00
|
|
|
#[error("Cannot find last revision in blobstore")]
|
2019-01-14 20:29:33 +03:00
|
|
|
NoLastRevision,
|
2019-11-15 09:02:57 +03:00
|
|
|
#[error("Cannot find bonsai for {0}")]
|
2019-01-14 20:29:33 +03:00
|
|
|
BonsaiNotFound(HgChangesetId),
|
2018-08-08 15:24:56 +03:00
|
|
|
}
|