feat: 🎸 add watch mode #12

Closes: #12
This commit is contained in:
Stepan Kuzmin 2019-03-16 18:53:12 +03:00
parent 66a850e215
commit 5eeef48b30
7 changed files with 122 additions and 73 deletions

View File

@ -85,6 +85,8 @@ Table Sources list endpoint is available at `/index.json`
curl localhost:3000/index.json
```
**Note**: if in `watch` mode, this will rescan database for table sources.
### Table Source TileJSON
Table Source [TileJSON](https://github.com/mapbox/tilejson-spec) endpoint is available at `/{schema_name}.{table_name}.json`.
@ -146,6 +148,8 @@ Function Sources list endpoint is available at `/rpc/index.json`
curl localhost:3000/rpc/index.json
```
**Note**: if in `watch` mode, this will rescan database for function sources.
### Function Source TileJSON
Function Source [TileJSON](https://github.com/mapbox/tilejson-spec) endpoint is available at `/rpc/{schema_name}.{function_name}.json`
@ -179,11 +183,12 @@ Usage:
Options:
-h --help Show this screen.
-v --version Show version.
--workers=<n> Number of web server workers.
--pool_size=<n> Maximum connections pool size [default: 20].
--config=<path> Path to config file.
--keep_alive=<n> Connection keep alive timeout [default: 75].
--listen_addresses=<n> The socket address to bind [default: 0.0.0.0:3000].
--config=<path> Path to config file.
--pool_size=<n> Maximum connections pool size [default: 20].
--watch Scan for new sources on sources list requests
--workers=<n> Number of web server workers.
```
## Environment Variables
@ -194,8 +199,9 @@ You can also configure martin using environment variables
| -------------------- | -------------------------------- | ----------------------------- |
| DATABASE_URL | postgres://postgres@localhost/db | postgres database connection |
| DATABASE_POOL_SIZE | 20 | maximum connections pool size |
| WORKER_PROCESSES | 8 | number of web server workers |
| KEEP_ALIVE | 75 | connection keep alive timeout |
| WATCH_MODE | true | scan for new sources |
| WORKER_PROCESSES | 8 | number of web server workers |
## Configuration File
@ -208,6 +214,9 @@ martin --config config.yaml
You can find an example of a configuration file [here](https://github.com/urbica/martin/blob/master/tests/config.yaml).
```yaml
# Database connection string
connection_string: 'postgres://postgres@localhost/db'
# Maximum connections pool size [default: 20]
pool_size: 20
@ -220,6 +229,9 @@ worker_processes: 8
# The socket address to bind [default: 0.0.0.0:3000]
listen_addresses: '0.0.0.0:3000'
# Enable watch mode
watch: true
# associative arrays of table sources
table_sources:
public.table_source:

View File

@ -1,6 +1,6 @@
use actix::*;
use actix_web::*;
use futures::future::Future;
use futures::future::{result, Future};
use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;
@ -20,33 +20,49 @@ pub struct State {
coordinator: Addr<CoordinatorActor>,
table_sources: Rc<RefCell<Option<TableSources>>>,
function_sources: Rc<RefCell<Option<FunctionSources>>>,
watch_mode: bool,
}
fn get_table_sources(
req: &HttpRequest<State>,
) -> Result<Box<Future<Item = HttpResponse, Error = Error>>> {
let state = &req.state();
let coordinator = state.coordinator.clone();
if state.watch_mode {
info!("Scanning database for table sources");
let coordinator = state.coordinator.clone();
let result = req.state().db.send(messages::GetTableSources {});
let result = req.state().db.send(messages::GetTableSources {});
let response = result
.from_err()
.and_then(move |res| match res {
Ok(table_sources) => {
coordinator.do_send(messages::RefreshTableSources {
table_sources: Some(table_sources.clone()),
});
let response = result
.from_err()
.and_then(move |res| match res {
Ok(table_sources) => {
coordinator.do_send(messages::RefreshTableSources {
table_sources: Some(table_sources.clone()),
});
Ok(HttpResponse::Ok()
.header("Access-Control-Allow-Origin", "*")
.json(table_sources))
}
Err(_) => Ok(HttpResponse::InternalServerError().into()),
})
.responder();
Ok(HttpResponse::Ok()
.header("Access-Control-Allow-Origin", "*")
.json(table_sources))
}
Err(_) => Ok(HttpResponse::InternalServerError().into()),
})
.responder();
return Ok(response);
}
Ok(response)
let table_sources = state
.table_sources
.borrow()
.clone()
.ok_or_else(|| error::ErrorNotFound("There is no table sources"))?;
let http_response = HttpResponse::Ok()
.header("Access-Control-Allow-Origin", "*")
.json(table_sources);
let response = result(Ok(http_response)).responder();
return Ok(response);
}
fn get_table_source(req: &HttpRequest<State>) -> Result<HttpResponse> {
@ -138,26 +154,42 @@ fn get_function_sources(
req: &HttpRequest<State>,
) -> Result<Box<Future<Item = HttpResponse, Error = Error>>> {
let state = &req.state();
let coordinator = state.coordinator.clone();
if state.watch_mode {
info!("Scanning database for function sources");
let coordinator = state.coordinator.clone();
let result = req.state().db.send(messages::GetFunctionSources {});
let response = result
.from_err()
.and_then(move |res| match res {
Ok(function_sources) => {
coordinator.do_send(messages::RefreshFunctionSources {
function_sources: Some(function_sources.clone()),
});
let result = req.state().db.send(messages::GetFunctionSources {});
let response = result
.from_err()
.and_then(move |res| match res {
Ok(function_sources) => {
coordinator.do_send(messages::RefreshFunctionSources {
function_sources: Some(function_sources.clone()),
});
Ok(HttpResponse::Ok()
.header("Access-Control-Allow-Origin", "*")
.json(function_sources))
}
Err(_) => Ok(HttpResponse::InternalServerError().into()),
})
.responder();
Ok(HttpResponse::Ok()
.header("Access-Control-Allow-Origin", "*")
.json(function_sources))
}
Err(_) => Ok(HttpResponse::InternalServerError().into()),
})
.responder();
Ok(response)
return Ok(response);
}
let function_sources = state
.function_sources
.borrow()
.clone()
.ok_or_else(|| error::ErrorNotFound("There is no table sources"))?;
let http_response = HttpResponse::Ok()
.header("Access-Control-Allow-Origin", "*")
.json(function_sources);
let response = result(Ok(http_response)).responder();
return Ok(response);
}
fn get_function_source(req: &HttpRequest<State>) -> Result<HttpResponse> {
@ -248,6 +280,7 @@ pub fn new(
coordinator: Addr<CoordinatorActor>,
table_sources: Option<TableSources>,
function_sources: Option<FunctionSources>,
watch_mode: bool,
) -> App<State> {
let table_sources_rc = Rc::new(RefCell::new(table_sources));
let function_sources_rc = Rc::new(RefCell::new(function_sources));
@ -265,6 +298,7 @@ pub fn new(
coordinator,
table_sources: table_sources_rc.clone(),
function_sources: function_sources_rc.clone(),
watch_mode,
};
App::with_state(state)
@ -328,6 +362,7 @@ mod tests {
State {
db,
coordinator,
watch_mode: true,
table_sources: table_sources_rc.clone(),
function_sources: function_sources_rc.clone(),
}

View File

@ -9,21 +9,23 @@ Usage:
Options:
-h --help Show this screen.
-v --version Show version.
--workers=<n> Number of web server workers.
--pool_size=<n> Maximum connections pool size [default: 20].
--config=<path> Path to config file.
--keep_alive=<n> Connection keep alive timeout [default: 75].
--listen_addresses=<n> The socket address to bind [default: 0.0.0.0:3000].
--config=<path> Path to config file.
--pool_size=<n> Maximum connections pool size [default: 20].
--watch Scan for new sources on sources list requests
--workers=<n> Number of web server workers.
";
#[derive(Debug, Deserialize)]
pub struct Args {
pub arg_connection: Option<String>,
pub flag_config: Option<String>,
pub flag_help: bool,
pub flag_version: bool,
pub flag_workers: Option<usize>,
pub flag_pool_size: Option<u32>,
pub flag_keep_alive: Option<usize>,
pub flag_listen_addresses: Option<String>,
pub flag_config: Option<String>,
pub arg_connection: Option<String>,
pub flag_pool_size: Option<u32>,
pub flag_watch: bool,
pub flag_version: bool,
pub flag_workers: Option<usize>,
}

View File

@ -12,6 +12,7 @@ use super::table_source::{get_table_sources, TableSources};
#[derive(Clone, Debug, Serialize)]
pub struct Config {
pub watch: bool,
pub pool_size: u32,
pub keep_alive: usize,
pub worker_processes: usize,
@ -23,6 +24,7 @@ pub struct Config {
#[derive(Deserialize)]
struct ConfigBuilder {
pub watch: Option<bool>,
pub pool_size: Option<u32>,
pub keep_alive: Option<usize>,
pub worker_processes: Option<usize>,
@ -35,6 +37,7 @@ struct ConfigBuilder {
impl ConfigBuilder {
pub fn finalize(self) -> Config {
Config {
watch: self.watch.unwrap_or(false),
pool_size: self.pool_size.unwrap_or(20),
keep_alive: self.keep_alive.unwrap_or(75),
worker_processes: self.worker_processes.unwrap_or_else(num_cpus::get),
@ -59,16 +62,6 @@ pub fn read_config(file_name: &str) -> io::Result<Config> {
Ok(config_builder.finalize())
}
// pub fn write_config(file_name: &str, config: Config) -> io::Result<()> {
// let mut file = File::create(file_name)?;
// let config = serde_yaml::to_string(&config)
// .map_err(|err| io::Error::new(io::ErrorKind::Other, err.description()))?;
// file.write_all(config.as_bytes())?;
// Ok(())
// }
pub fn generate_config(
args: Args,
connection_string: String,
@ -82,6 +75,7 @@ pub fn generate_config(
let function_sources = get_function_sources(&conn)?;
let config = ConfigBuilder {
watch: Some(args.flag_watch),
keep_alive: args.flag_keep_alive,
listen_addresses: args.flag_listen_addresses,
connection_string: connection_string,
@ -94,10 +88,3 @@ pub fn generate_config(
let config = config.finalize();
Ok(config)
}
// pub fn to_string(config: Config) -> io::Result<String> {
// let config = serde_yaml::to_string(&config)
// .map_err(|err| io::Error::new(io::ErrorKind::Other, err.description()))?;
// Ok(config)
// }

View File

@ -75,8 +75,9 @@ fn setup_from_database(args: Args) -> Result<(Config, PostgresPool), std::io::Er
fn start(args: Args) -> Result<actix::SystemRunner, std::io::Error> {
info!("Starting martin v{}", VERSION);
let (config, pool) = if args.flag_config.is_some() {
let file_name = args.flag_config.unwrap();
let config_file_name = args.flag_config.clone();
let (config, pool) = if config_file_name.is_some() {
let file_name = config_file_name.clone().unwrap();
info!("Using {}", file_name);
setup_from_config(file_name)?
} else {
@ -91,8 +92,13 @@ fn start(args: Args) -> Result<actix::SystemRunner, std::io::Error> {
std::process::exit(-1);
}
let watch_mode = config.watch || env::var_os("WATCH_MODE").is_some();
if watch_mode {
info!("Watch mode enabled");
}
let listen_addresses = config.listen_addresses.clone();
let server = server::new(config, pool);
let server = server::new(pool, config, watch_mode);
info!("Martin has been started on {}.", listen_addresses);
Ok(server)

View File

@ -7,7 +7,7 @@ use super::coordinator_actor::CoordinatorActor;
use super::db::PostgresPool;
use super::db_executor::DbExecutor;
pub fn new(config: Config, pool: PostgresPool) -> SystemRunner {
pub fn new(pool: PostgresPool, config: Config, watch_mode: bool) -> SystemRunner {
let server = System::new("server");
let db = SyncArbiter::start(3, move || DbExecutor(pool.clone()));
@ -23,6 +23,7 @@ pub fn new(config: Config, pool: PostgresPool) -> SystemRunner {
coordinator.clone(),
config.table_sources.clone(),
config.function_sources.clone(),
watch_mode,
)
})
.bind(listen_addresses.clone())

View File

@ -1,16 +1,22 @@
---
# Maximum connections pool size [default: 20]
pool_size: 20
# Database connection string
connection_string: "postgres://postgres@localhost/test"
# Connection keep alive timeout [default: 75]
keep_alive: 75
# The socket address to bind [default: 0.0.0.0:3000]
listen_addresses: "0.0.0.0:3000"
# Maximum connections pool size [default: 20]
pool_size: 20
# Enable watch mode
watch: false
# Number of web server workers
worker_processes: 8
# The socket address to bind [default: 0.0.0.0:3000]
listen_addresses: '0.0.0.0:3000'
# associative arrays of table sources
table_sources:
public.table_source: