// Copyright (c) 2004-present, Facebook, Inc. // All Rights Reserved. // // This software may be used and distributed according to the terms of the // GNU General Public License version 2 or any later version. #![deny(warnings)] use blobrepo::BlobRepo; use bookmarks::Bookmark; use failure::Error; use failure::Result; use futures::{Future, Stream}; use futures_ext::{BoxFuture, FutureExt}; use hooks::{BlobRepoChangesetStore, BlobRepoFileContentStore, ChangesetHookExecutionID, FileHookExecutionID, HookExecution, HookManager, hook_loader::load_hooks}; use manifold::{ManifoldHttpClient, PayloadRange}; use mercurial_types::{HgChangesetId, HgNodeHash}; use metaconfig::repoconfig::RepoConfig; use revset::AncestorsNodeStream; use slog::Logger; use std::sync::Arc; pub struct Tailer { repo: Arc, hook_manager: Arc, bookmark: Bookmark, last_rev_key: String, manifold_client: ManifoldHttpClient, logger: Logger, } impl Tailer { pub fn new( repo_name: String, repo: Arc, config: RepoConfig, bookmark: Bookmark, manifold_client: ManifoldHttpClient, logger: Logger, ) -> Result { let changeset_store = BlobRepoChangesetStore::new((*repo).clone()); let content_store = BlobRepoFileContentStore::new((*repo).clone()); let mut hook_manager = HookManager::new( repo_name, Box::new(changeset_store), Arc::new(content_store), 1024 * 1024, 1024 * 1024 * 1024, logger.clone(), ); load_hooks(&mut hook_manager, config)?; let repo_id = repo.get_repoid().id(); let last_rev_key = format!("{}{}", "__mononoke_hook_tailer_last_rev.", repo_id).to_string(); Ok(Tailer { repo, hook_manager: Arc::new(hook_manager), bookmark, last_rev_key, manifold_client, logger, }) } pub fn get_last_rev_key(&self) -> String { self.last_rev_key.clone() } fn run_in_range0( repo: Arc, hm: Arc, last_rev: HgNodeHash, end_rev: HgNodeHash, bm: Bookmark, logger: Logger, ) -> BoxFuture< Vec< ( Vec<(FileHookExecutionID, HookExecution)>, Vec<(ChangesetHookExecutionID, HookExecution)>, ), >, Error, > { debug!(logger, "Running in range {} to {}", last_rev, end_rev); let logger = logger.clone(); let logger2 = logger.clone(); let hm2 = hm.clone(); let bm2 = bm.clone(); AncestorsNodeStream::new(&repo, end_rev) .take(1000) // Limit number so we don't process too many .take_while(move |node_hash| { Ok(*node_hash != last_rev) }) .and_then(move |node_hash| { info!(logger, "Running file hooks for changeset {:?}", node_hash); hm.run_file_hooks_for_bookmark(HgChangesetId::new(node_hash.clone()), &bm) .map(move |res| (node_hash, res)) }) .and_then(move |(node_hash, file_res)| { info!(logger2, "Running changeset hooks for changeset {:?}", node_hash); hm2.run_changeset_hooks_for_bookmark(HgChangesetId::new(node_hash.clone()), &bm2) .map(|res| (file_res, res)) }) .collect() .boxify() } pub fn run_in_range( &self, last_rev: HgNodeHash, end_rev: HgNodeHash, ) -> BoxFuture< Vec< ( Vec<(FileHookExecutionID, HookExecution)>, Vec<(ChangesetHookExecutionID, HookExecution)>, ), >, Error, > { let repo = self.repo.clone(); let hm = self.hook_manager.clone(); let bm = self.bookmark.clone(); Tailer::run_in_range0(repo, hm, last_rev, end_rev, bm, self.logger.clone()) } pub fn run( &self, ) -> BoxFuture< Vec< ( Vec<(FileHookExecutionID, HookExecution)>, Vec<(ChangesetHookExecutionID, HookExecution)>, ), >, Error, > { let bm = self.bookmark.clone(); let bm2 = bm.clone(); let repo = self.repo.clone(); let hm = self.hook_manager.clone(); let last_rev_key = self.last_rev_key.clone(); let last_rev_key2 = last_rev_key.clone(); let manifold_client = self.manifold_client.clone(); let manifold_client2 = manifold_client.clone(); info!(self.logger, "Running tailer on bookmark {}", bm); let logger = self.logger.clone(); let logger2 = logger.clone(); let logger3 = logger.clone(); self.repo .get_bookmark(&bm) .and_then(|opt| opt.ok_or(ErrorKind::NoSuchBookmark(bm).into())) .and_then(move |current_bm_cs| { manifold_client .read(last_rev_key, PayloadRange::Full) .map(move |opt| (current_bm_cs, opt)) }) .and_then(|(current_bm_cs, opt)| match opt { Some(last_rev_bytes) => Ok((current_bm_cs, last_rev_bytes)), None => Err(ErrorKind::NoLastRevision.into()), }) .and_then(|(current_bm_cs, last_rev_bytes)| { let node_hash = HgNodeHash::from_bytes(&*last_rev_bytes.payload.payload)?; Ok((current_bm_cs, node_hash)) }) .and_then(move |(current_bm_cs, last_rev)| { let end_rev = current_bm_cs.into_nodehash(); info!( logger, "Bookmark is currently at {}, last processed revision is {}", end_rev, last_rev ); if last_rev == end_rev { info!(logger, "Nothing to do"); } Tailer::run_in_range0(repo, hm, last_rev, end_rev, bm2, logger3) .map(move |res| (end_rev, res)) }) .and_then(move |(end_rev, res)| { info!(logger2, "Setting last processed revision to {:?}", end_rev); let bytes = end_rev.as_bytes().into(); manifold_client2.write(last_rev_key2, bytes).map(|()| res) }) .boxify() } } #[derive(Debug, Fail)] pub enum ErrorKind { #[fail(display = "No such bookmark '{}'", _0)] NoSuchBookmark(Bookmark), #[fail(display = "Cannot find last revision in blobstore")] NoLastRevision, }