Implement hook tailer

Summary:
This diff implements a service which tails a configurable bookmark and runs hooks against it.
It uses the standard Mononoke config from the meta config repo.

Reviewed By: StanislavGlebik

Differential Revision: D8898637

fbshipit-source-id: f710fe4c9bda1b78bd17eb6cd6abf2abda4fdb8e
This commit is contained in:
Tim Fox 2018-08-08 05:24:56 -07:00 committed by Facebook Github Bot
parent 2632421afe
commit a82b90c6f9
5 changed files with 537 additions and 8 deletions

View File

@ -49,6 +49,7 @@ use hooks::{BlobRepoChangesetStore, BlobRepoFileContentStore, HookExecution, Hoo
use hooks::lua_hook::LuaHook;
use mercurial_types::{HgChangesetId, RepositoryId};
use slog::{Drain, Level, Logger};
use slog::Discard;
use slog_glog_fmt::default_drain as glog_drain;
use std::env::args;
use std::fs::File;
@ -112,8 +113,15 @@ fn run_hook(
let changeset_store = Box::new(BlobRepoChangesetStore::new(repo.clone()));
let content_store = Arc::new(BlobRepoFileContentStore::new(repo.clone()));
let mut hook_manager =
HookManager::new(repo_name, changeset_store, content_store, 1024, 1024 * 1024);
let logger = Logger::root(Discard {}.ignore_res(), o!());
let mut hook_manager = HookManager::new(
repo_name,
changeset_store,
content_store,
1024,
1024 * 1024,
logger,
);
let hook = LuaHook {
name: String::from("testhook"),
code,

255
hook_tailer/main.rs Normal file
View File

@ -0,0 +1,255 @@
// Copyright (c) 2004-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
#![deny(warnings)]
#![feature(never_type)]
#[cfg(test)]
extern crate async_unit;
extern crate blobrepo;
extern crate blobstore;
extern crate bookmarks;
extern crate bytes;
extern crate clap;
#[macro_use]
extern crate failure_ext as failure;
extern crate futures;
extern crate futures_ext;
extern crate hooks;
extern crate manifold;
extern crate mercurial_types;
extern crate metaconfig;
extern crate mononoke_types;
extern crate repo_client;
extern crate revset;
#[macro_use]
extern crate slog;
extern crate slog_glog_fmt;
extern crate slog_kvfilter;
extern crate slog_logview;
extern crate slog_scuba;
extern crate slog_stats;
extern crate slog_term;
extern crate tokio;
extern crate tokio_timer;
pub mod tailer;
use blobrepo::BlobRepo;
use bookmarks::Bookmark;
use clap::{App, ArgMatches};
use failure::Error;
use failure::Result;
use futures::future::{loop_fn, Future, Loop};
use futures_ext::{BoxFuture, FutureExt};
use manifold::{ManifoldHttpClient, RequestContext};
use mercurial_types::{HgNodeHash, RepositoryId};
use metaconfig::RepoConfigs;
use repo_client::MononokeRepo;
use slog::{Drain, Level, Logger};
use slog_glog_fmt::{kv_categorizer, kv_defaults, GlogFormat};
use slog_logview::LogViewDrain;
use slog_scuba::ScubaDrain;
use std::io;
use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;
use tailer::Tailer;
use tokio_timer::sleep;
fn main() -> Result<()> {
let matches = setup_app().get_matches();
let repo_name = matches.value_of("repo_name").unwrap();
let logger = setup_logger(&matches, repo_name.to_string());
info!(logger, "Hook tailer is starting");
let configs = get_config(&logger, &matches)?;
let bookmark_name = matches.value_of("bookmark").unwrap();
let bookmark = Bookmark::new(bookmark_name).unwrap();
let err: Error = ErrorKind::NoSuchRepo(repo_name.into()).into();
let config = configs.repos.get(repo_name).ok_or(err)?;
let mononoke_repo = MononokeRepo::new(
logger.clone(),
&config.repotype,
RepositoryId::new(config.repoid),
)?;
let rc = RequestContext {
bucket_name: "mononoke_prod".into(),
api_key: "mononoke_prod-key".into(),
timeout_msec: 10000,
};
let id = "ManifoldBlob";
let manifold_client = ManifoldHttpClient::new(id, rc)?;
let tailer = Tailer::new(
repo_name.to_string(),
mononoke_repo.blobrepo(),
config.clone(),
bookmark,
manifold_client.clone(),
logger.clone(),
)?;
let fut = match matches.value_of("init_revision") {
Some(init_rev) => {
info!(
logger.clone(),
"Initial revision specified as argument {}",
init_rev
);
let hash = HgNodeHash::from_str(init_rev)?;
let bytes = hash.as_bytes().into();
manifold_client
.write(tailer.get_last_rev_key(), bytes)
.map(|_| ())
.boxify()
}
None => futures::future::ok(()).boxify(),
};
let fut = fut.then(|_| create_poller(tailer, logger));
tokio::run(fut);
Ok(())
}
fn create_poller(tailer: Tailer, logger: Logger) -> BoxFuture<(), ()> {
let logger2 = logger.clone();
let logger3 = logger.clone();
let lf = loop_fn(tailer, move |tailer| {
let logger4 = logger2.clone();
tailer
.run()
.map(move |res| {
res.into_iter().for_each(|(v_files, v_cs)| {
info!(logger4, "==== File hooks results ====");
v_files.into_iter().for_each(|(exec_id, exec)| {
info!(
logger4,
"changeset:{} hook_name:{} path:{} result:{:?}",
exec_id.cs_id,
exec_id.hook_name,
exec_id.path,
exec
);
});
info!(logger4, "==== Changeset hooks results ====");
v_cs.into_iter().for_each(|(exec_id, exec)| {
info!(
logger4,
"changeset:{} hook_name:{} result:{:?}",
exec_id.cs_id,
exec_id.hook_name,
exec
);
});
});
()
})
.and_then(|()| {
sleep(Duration::new(10, 0))
.map_err(|err| format_err!("Tokio timer error {:?}", err))
})
.and_then(move |()| Ok(Loop::Continue(tailer)))
});
lf.map_err(move |err| {
error!(logger3, "Failed to run tailer {:?}", err);
()
}).boxify()
}
fn setup_app<'a, 'b>() -> App<'a, 'b> {
App::new("mononoke hook server")
.version("0.0.0")
.about("run hooks against repo")
.args_from_usage(
r#"
<crpath> -P, --configrepo_path [PATH] 'path to the config repo in rocksdb form'
-C, --configrepo_hash [HASH] 'config repo commit hash'
<crbook> -C, --configrepo_book [BOOK] 'config repo bookmark'
<bookmark> -B, --bookmark [BOOK] 'bookmark to tail'
--poll-interval 'the poll interval in seconds'
<repo_name> -R, --repo_name [REPO_NAME] 'the name of the repo to run hooks for'
--init_revision [INIT_REVISION] 'the initial revision to start at'
-d, --debug 'print debug level output'
"#,
)
}
fn setup_logger<'a>(matches: &ArgMatches<'a>, repo_name: String) -> Logger {
let level = if matches.is_present("debug") {
Level::Debug
} else {
Level::Info
};
let drain = {
let drain = {
let decorator = slog_term::PlainSyncDecorator::new(io::stdout());
let stderr_drain = GlogFormat::new(decorator, kv_categorizer::FacebookCategorizer);
let logview_drain = LogViewDrain::new("mononoke_hook_tailer_log");
let scuba_drain = ScubaDrain::new("mononoke_hook_tailer");
let drain = slog::Duplicate::new(stderr_drain, logview_drain);
slog::Duplicate::new(scuba_drain, drain)
};
let drain = slog_stats::StatsDrain::new(drain);
drain.filter_level(level)
};
Logger::root(
drain.fuse(),
o!("repo" => repo_name,
kv_defaults::FacebookKV::new().expect("Failed to initialize logging")),
)
}
fn get_config<'a>(logger: &Logger, matches: &ArgMatches<'a>) -> Result<RepoConfigs> {
let crpath = PathBuf::from(matches.value_of("crpath").unwrap());
let config_repo = BlobRepo::new_rocksdb(
logger.new(o!["repo" => "Config repo"]),
&crpath,
RepositoryId::new(0),
)?;
let changesetid = match matches.value_of("crbook") {
Some(book) => {
let book = bookmarks::Bookmark::new(book).expect("book must be ascii");
config_repo
.get_bookmark(&book)
.wait()?
.expect("bookmark not found")
}
None => mercurial_types::nodehash::HgChangesetId::from_str(
matches
.value_of("crhash")
.expect("crhash and crbook are not specified"),
)?,
};
info!(
logger,
"Config repository will be read from commit: {}", changesetid
);
RepoConfigs::read_config_repo(config_repo, changesetid)
.from_err()
.wait()
}
#[derive(Debug, Fail)]
pub enum ErrorKind {
#[fail(display = "No such repo '{}'", _0)] NoSuchRepo(String),
}

199
hook_tailer/tailer.rs Normal file
View File

@ -0,0 +1,199 @@
// Copyright (c) 2004-present, Facebook, Inc.
// All Rights Reserved.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.
#![deny(warnings)]
use blobrepo::BlobRepo;
use bookmarks::Bookmark;
use failure::Error;
use failure::Result;
use futures::{Future, Stream};
use futures_ext::{BoxFuture, FutureExt};
use hooks::{BlobRepoChangesetStore, BlobRepoFileContentStore, ChangesetHookExecutionID,
FileHookExecutionID, HookExecution, HookManager, hook_loader::load_hooks};
use manifold::{ManifoldHttpClient, PayloadRange};
use mercurial_types::{HgChangesetId, HgNodeHash};
use metaconfig::repoconfig::RepoConfig;
use revset::AncestorsNodeStream;
use slog::Logger;
use std::sync::Arc;
pub struct Tailer {
repo: Arc<BlobRepo>,
hook_manager: Arc<HookManager>,
bookmark: Bookmark,
last_rev_key: String,
manifold_client: ManifoldHttpClient,
logger: Logger,
}
impl Tailer {
pub fn new(
repo_name: String,
repo: Arc<BlobRepo>,
config: RepoConfig,
bookmark: Bookmark,
manifold_client: ManifoldHttpClient,
logger: Logger,
) -> Result<Tailer> {
let changeset_store = BlobRepoChangesetStore::new((*repo).clone());
let content_store = BlobRepoFileContentStore::new((*repo).clone());
let mut hook_manager = HookManager::new(
repo_name,
Box::new(changeset_store),
Arc::new(content_store),
1024 * 1024,
1024 * 1024 * 1024,
logger.clone(),
);
load_hooks(&mut hook_manager, config)?;
let repo_id = repo.get_repoid().id();
let last_rev_key = format!("{}{}", "__mononoke_hook_tailer_last_rev.", repo_id).to_string();
Ok(Tailer {
repo,
hook_manager: Arc::new(hook_manager),
bookmark,
last_rev_key,
manifold_client,
logger,
})
}
pub fn get_last_rev_key(&self) -> String {
self.last_rev_key.clone()
}
fn run_in_range0(
repo: Arc<BlobRepo>,
hm: Arc<HookManager>,
last_rev: HgNodeHash,
end_rev: HgNodeHash,
bm: Bookmark,
logger: Logger,
) -> BoxFuture<
Vec<
(
Vec<(FileHookExecutionID, HookExecution)>,
Vec<(ChangesetHookExecutionID, HookExecution)>,
),
>,
Error,
> {
debug!(logger, "Running in range {} to {}", last_rev, end_rev);
let logger = logger.clone();
let logger2 = logger.clone();
let hm2 = hm.clone();
let bm2 = bm.clone();
AncestorsNodeStream::new(&repo, end_rev)
.take(1000) // Limit number so we don't process too many
.take_while(move |node_hash| {
Ok(*node_hash != last_rev)
})
.and_then(move |node_hash| {
info!(logger, "Running file hooks for changeset {:?}", node_hash);
hm.run_file_hooks_for_bookmark(HgChangesetId::new(node_hash.clone()), &bm)
.map(move |res| (node_hash, res))
})
.and_then(move |(node_hash, file_res)| {
info!(logger2, "Running changeset hooks for changeset {:?}", node_hash);
hm2.run_changeset_hooks_for_bookmark(HgChangesetId::new(node_hash.clone()), &bm2)
.map(|res| (file_res, res))
})
.collect()
.boxify()
}
pub fn run_in_range(
&self,
last_rev: HgNodeHash,
end_rev: HgNodeHash,
) -> BoxFuture<
Vec<
(
Vec<(FileHookExecutionID, HookExecution)>,
Vec<(ChangesetHookExecutionID, HookExecution)>,
),
>,
Error,
> {
let repo = self.repo.clone();
let hm = self.hook_manager.clone();
let bm = self.bookmark.clone();
Tailer::run_in_range0(repo, hm, last_rev, end_rev, bm, self.logger.clone())
}
pub fn run(
&self,
) -> BoxFuture<
Vec<
(
Vec<(FileHookExecutionID, HookExecution)>,
Vec<(ChangesetHookExecutionID, HookExecution)>,
),
>,
Error,
> {
let bm = self.bookmark.clone();
let bm2 = bm.clone();
let repo = self.repo.clone();
let hm = self.hook_manager.clone();
let last_rev_key = self.last_rev_key.clone();
let last_rev_key2 = last_rev_key.clone();
let manifold_client = self.manifold_client.clone();
let manifold_client2 = manifold_client.clone();
info!(self.logger, "Running tailer on bookmark {}", bm);
let logger = self.logger.clone();
let logger2 = logger.clone();
let logger3 = logger.clone();
self.repo
.get_bookmark(&bm)
.and_then(|opt| opt.ok_or(ErrorKind::NoSuchBookmark(bm).into()))
.and_then(move |current_bm_cs| {
manifold_client
.read(last_rev_key, PayloadRange::Full)
.map(move |opt| (current_bm_cs, opt))
})
.and_then(|(current_bm_cs, opt)| match opt {
Some(last_rev_bytes) => Ok((current_bm_cs, last_rev_bytes)),
None => Err(ErrorKind::NoLastRevision.into()),
})
.and_then(|(current_bm_cs, last_rev_bytes)| {
let node_hash = HgNodeHash::from_bytes(&*last_rev_bytes.payload.payload)?;
Ok((current_bm_cs, node_hash))
})
.and_then(move |(current_bm_cs, last_rev)| {
let end_rev = current_bm_cs.into_nodehash();
info!(
logger,
"Bookmark is currently at {}, last processed revision is {}", end_rev, last_rev
);
if last_rev == end_rev {
info!(logger, "Nothing to do");
}
Tailer::run_in_range0(repo, hm, last_rev, end_rev, bm2, logger3)
.map(move |res| (end_rev, res))
})
.and_then(move |(end_rev, res)| {
info!(logger2, "Setting last processed revision to {:?}", end_rev);
let bytes = end_rev.as_bytes().into();
manifold_client2.write(last_rev_key2, bytes).map(|()| res)
})
.boxify()
}
}
#[derive(Debug, Fail)]
pub enum ErrorKind {
#[fail(display = "No such bookmark '{}'", _0)] NoSuchBookmark(Bookmark),
#[fail(display = "Cannot find last revision in blobstore")] NoLastRevision,
}

View File

@ -67,6 +67,7 @@ mod test {
use async_unit;
use fixtures::many_files_dirs;
use metaconfig::repoconfig::{BookmarkParams, HookParams, RepoType};
use slog::{Discard, Drain};
#[test]
fn test_load_hooks() {
@ -158,12 +159,14 @@ mod test {
let repo = many_files_dirs::getrepo(None);
let changeset_store = BlobRepoChangesetStore::new(repo.clone());
let content_store = BlobRepoFileContentStore::new(repo);
let logger = Logger::root(Discard {}.ignore_res(), o!());
HookManager::new(
"some_repo".into(),
Box::new(changeset_store),
Arc::new(content_store),
1024,
1024 * 1024,
logger,
)
}

View File

@ -40,6 +40,8 @@ extern crate maplit;
extern crate mercurial_types;
extern crate metaconfig;
extern crate mononoke_types;
#[macro_use]
extern crate slog;
#[cfg(test)]
extern crate tempdir;
@ -58,6 +60,7 @@ use futures::{failed, finished, Future};
use futures_ext::{BoxFuture, FutureExt};
use mercurial_types::{Changeset, HgChangesetId, HgParents, MPath};
use mononoke_types::FileContents;
use slog::Logger;
use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::mem;
@ -77,6 +80,7 @@ pub struct HookManager {
bookmark_hooks: HashMap<Bookmark, Vec<String>>,
repo_name: String,
changeset_store: Box<ChangesetStore>,
logger: Logger,
}
impl HookManager {
@ -86,6 +90,7 @@ impl HookManager {
content_store: Arc<FileContentStore>,
entrylimit: usize,
weightlimit: usize,
logger: Logger,
) -> HookManager {
let changeset_hooks = HashMap::new();
let file_hooks = Arc::new(Mutex::new(HashMap::new()));
@ -104,6 +109,7 @@ impl HookManager {
bookmark_hooks: HashMap::new(),
repo_name,
changeset_store,
logger,
}
}
@ -142,9 +148,16 @@ impl HookManager {
&self,
changeset_id: HgChangesetId,
bookmark: &Bookmark,
) -> BoxFuture<Vec<(String, HookExecution)>, Error> {
) -> BoxFuture<Vec<(ChangesetHookExecutionID, HookExecution)>, Error> {
match self.bookmark_hooks.get(bookmark) {
Some(hooks) => self.run_changeset_hooks_for_changeset_id(changeset_id, hooks.to_vec()),
Some(hooks) => {
let hooks = hooks
.clone()
.into_iter()
.filter(|name| self.changeset_hooks.contains_key(name))
.collect();
self.run_changeset_hooks_for_changeset_id(changeset_id, hooks)
}
None => return finished(Vec::new()).boxify(),
}
}
@ -153,7 +166,7 @@ impl HookManager {
&self,
changeset_id: HgChangesetId,
hooks: Vec<String>,
) -> BoxFuture<Vec<(String, HookExecution)>, Error> {
) -> BoxFuture<Vec<(ChangesetHookExecutionID, HookExecution)>, Error> {
let hooks: Result<Vec<(String, Arc<Hook<HookChangeset>>)>, Error> = hooks
.iter()
.map(|hook_name| {
@ -173,6 +186,19 @@ impl HookManager {
hooks.clone(),
)
})
.map(move |res| {
res.into_iter()
.map(|(hook_name, exec)| {
(
ChangesetHookExecutionID {
cs_id: changeset_id,
hook_name,
},
exec,
)
})
.collect()
})
.boxify()
}
@ -209,8 +235,21 @@ impl HookManager {
changeset_id: HgChangesetId,
bookmark: &Bookmark,
) -> BoxFuture<Vec<(FileHookExecutionID, HookExecution)>, Error> {
debug!(
self.logger.clone(),
"Running file hooks for bookmark {:?}",
bookmark
);
match self.bookmark_hooks.get(bookmark) {
Some(hooks) => self.run_file_hooks_for_changeset_id(changeset_id, hooks.to_vec()),
Some(hooks) => {
let file_hooks = self.file_hooks.lock().unwrap();
let hooks = hooks
.clone()
.into_iter()
.filter(|name| file_hooks.contains_key(name))
.collect();
self.run_file_hooks_for_changeset_id(changeset_id, hooks, self.logger.clone())
}
None => return Box::new(finished(Vec::new())),
}
}
@ -219,7 +258,12 @@ impl HookManager {
&self,
changeset_id: HgChangesetId,
hooks: Vec<String>,
logger: Logger,
) -> BoxFuture<Vec<(FileHookExecutionID, HookExecution)>, Error> {
debug!(
self.logger,
"Running file hooks for changeset id {:?}", changeset_id
);
let cache = self.cache.clone();
self.get_hook_changeset(changeset_id)
.and_then(move |hcs| {
@ -228,6 +272,7 @@ impl HookManager {
hcs.clone(),
hooks.clone(),
cache,
logger,
)
})
.boxify()
@ -238,6 +283,7 @@ impl HookManager {
changeset: HookChangeset,
hooks: Vec<String>,
cache: Cache,
logger: Logger,
) -> BoxFuture<Vec<(FileHookExecutionID, HookExecution)>, Error> {
let v: Vec<BoxFuture<Vec<(FileHookExecutionID, HookExecution)>, _>> = changeset
.files
@ -248,6 +294,7 @@ impl HookManager {
path.to_string(),
hooks.clone(),
cache.clone(),
logger.clone(),
)
})
.collect();
@ -261,6 +308,7 @@ impl HookManager {
path: String,
hooks: Vec<String>,
cache: Cache,
logger: Logger,
) -> BoxFuture<Vec<(FileHookExecutionID, HookExecution)>, Error> {
let v: Vec<BoxFuture<(FileHookExecutionID, HookExecution), _>> = hooks
.iter()
@ -272,6 +320,7 @@ impl HookManager {
path: path.clone(),
},
cache.clone(),
logger.clone(),
)
})
.collect();
@ -281,7 +330,9 @@ impl HookManager {
fn run_file_hook(
key: FileHookExecutionID,
cache: Cache,
logger: Logger,
) -> BoxFuture<(FileHookExecutionID, HookExecution), Error> {
debug!(logger, "Running file hook {:?}", key);
cache.get(key.clone()).map(|he| (key, he)).boxify()
}
@ -557,7 +608,7 @@ impl Filler for HookCacheFiller {
HookContext::new(key.hook_name.clone(), self.repo_name.clone(), hook_file);
arc_hook.run(hook_context)
}
None => panic!("Can't find hook"), // TODO
None => panic!("Can't find hook {}", key.hook_name), // TODO
}
}
}
@ -571,6 +622,12 @@ pub struct FileHookExecutionID {
pub path: String,
}
#[derive(Clone, Debug, PartialEq, Hash, Eq)]
pub struct ChangesetHookExecutionID {
pub cs_id: HgChangesetId,
pub hook_name: String,
}
impl Weight for FileHookExecutionID {
fn get_weight(&self) -> usize {
self.cs_id.get_weight() + self.hook_name.get_weight() + self.path.get_weight()
@ -645,6 +702,7 @@ mod test {
use fixtures::many_files_dirs;
use futures::Future;
use futures::future::finished;
use slog::{Discard, Drain};
use std::collections::hash_map::Entry;
use std::str::FromStr;
@ -1062,7 +1120,9 @@ mod test {
&Bookmark::new(bookmark_name).unwrap(),
);
let res = fut.wait().unwrap();
let map: HashMap<String, HookExecution> = res.into_iter().collect();
let map: HashMap<String, HookExecution> = res.into_iter()
.map(|(exec_id, exec)| (exec_id.hook_name, exec))
.collect();
assert_eq!(expected, map);
}
@ -1128,12 +1188,14 @@ mod test {
let repo = many_files_dirs::getrepo(None);
let changeset_store = BlobRepoChangesetStore::new(repo.clone());
let content_store = BlobRepoFileContentStore::new(repo);
let logger = Logger::root(Discard {}.ignore_res(), o!());
HookManager::new(
"some_repo".into(),
Box::new(changeset_store),
Arc::new(content_store),
1024,
1024 * 1024,
logger,
)
}
@ -1157,12 +1219,14 @@ mod test {
(cs_id.clone(), to_mpath("dir1/subdir1/subsubdir2/file_2")),
"content_d2_f2".into(),
);
let logger = Logger::root(Discard {}.ignore_res(), o!());
HookManager::new(
"some_repo".into(),
Box::new(changeset_store),
Arc::new(content_store),
1024,
1024 * 1024,
logger,
)
}