From c55e61d27f62f2d23c7c6af4512a1a2f32dad282 Mon Sep 17 00:00:00 2001 From: Stepan Kuzmin Date: Wed, 25 Jul 2018 15:02:31 +0300 Subject: [PATCH] feat: add config support --- .gitignore | 2 + Cargo.lock | 28 +++++++++++++ Cargo.toml | 1 + src/config.rs | 85 ++++++++++++++++++++++++++++++++++++++++ src/db.rs | 2 +- src/main.rs | 55 +++++++------------------- src/scripts/get_tile.sql | 2 +- src/server.rs | 32 +++++++++++++++ src/source.rs | 52 +++++++++++++++--------- 9 files changed, 199 insertions(+), 60 deletions(-) create mode 100755 src/config.rs create mode 100755 src/server.rs diff --git a/.gitignore b/.gitignore index 77bceedc..e3be3888 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ /target/ **/*.rs.bk +db +config.yaml docker-compose.yml \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index c6245dac..831933b3 100755 --- a/Cargo.lock +++ b/Cargo.lock @@ -706,6 +706,11 @@ name = "linked-hash-map" version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "linked-hash-map" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "lock_api" version = "0.1.3" @@ -757,6 +762,7 @@ dependencies = [ "serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.24 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_yaml 0.7.5 (registry+https://github.com/rust-lang/crates.io-index)", "tilejson 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1207,6 +1213,17 @@ dependencies = [ "serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "serde_yaml" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "dtoa 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", + "linked-hash-map 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)", + "yaml-rust 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "sha1" version = "0.6.0" @@ -1697,6 +1714,14 @@ dependencies = [ "winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "yaml-rust" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "linked-hash-map 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + [metadata] "checksum actix 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "28b4c697e6e63280f874c1d3e30c476de1cc589efd91b32b748a17808cb4adc3" "checksum actix-web 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1bf2bc88dea9c852b9e2fc1c509a422907e05e83d06bef510e8f493b3396f01a" @@ -1779,6 +1804,7 @@ dependencies = [ "checksum lazycell 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d33a48d0365c96081958cc663eef834975cb1e8d8bea3378513fc72bdbf11e50" "checksum libc 0.2.42 (registry+https://github.com/rust-lang/crates.io-index)" = "b685088df2b950fccadf07a7187c8ef846a959c142338a48f9dc0b94517eb5f1" "checksum linked-hash-map 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7860ec297f7008ff7a1e3382d7f7e1dcd69efc94751a2284bafc3d013c2aa939" +"checksum linked-hash-map 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "70fb39025bc7cdd76305867c4eccf2f2dcf6e9a57f5b21a93e1c2d86cd03ec9e" "checksum lock_api 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "949826a5ccf18c1b3a7c3d57692778d21768b79e46eb9dd07bfc4c2160036c54" "checksum log 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "61bd98ae7f7b754bc53dca7d44b604f733c6bba044ea6f41bc8d89272d8161d2" "checksum lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4d06ff7ff06f729ce5f4e227876cb88d10bc59cd4ae1e09fbb2bde15c850dc21" @@ -1834,6 +1860,7 @@ dependencies = [ "checksum serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)" = "0c3adf19c07af6d186d91dae8927b83b0553d07ca56cbf7f2f32560455c91920" "checksum serde_derive 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)" = "3525a779832b08693031b8ecfb0de81cd71cfd3812088fafe9a7496789572124" "checksum serde_json 1.0.24 (registry+https://github.com/rust-lang/crates.io-index)" = "c3c6908c7b925cd6c590358a4034de93dbddb20c45e1d021931459fd419bf0e2" +"checksum serde_yaml 0.7.5 (registry+https://github.com/rust-lang/crates.io-index)" = "ef8099d3df28273c99a1728190c7a9f19d444c941044f64adf986bee7ec53051" "checksum sha1 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d" "checksum sha2 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9eb6be24e4c23a84d7184280d2722f7f2731fcdd4a9d886efbfe4413e4847ea0" "checksum siphasher 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0df90a788073e8d0235a67e50441d47db7c8ad9debd91cbf43736a2a92d36537" @@ -1891,3 +1918,4 @@ dependencies = [ "checksum winreg 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a27a759395c1195c4cc5cda607ef6f8f6498f64e78f7900f5de0a127a424704a" "checksum winutil 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7daf138b6b14196e3830a588acf1e86966c694d3e8fb026fb105b8b5dca07e6e" "checksum ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e" +"checksum yaml-rust 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "57ab38ee1a4a266ed033496cf9af1828d8d6e6c1cfa5f643a2809effcae4d628" diff --git a/Cargo.toml b/Cargo.toml index b07c10de..fb4ea165 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,4 +17,5 @@ r2d2_postgres = "0.14" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" +serde_yaml = "0.7" tilejson = "0.1" \ No newline at end of file diff --git a/src/config.rs b/src/config.rs new file mode 100755 index 00000000..eae4f32b --- /dev/null +++ b/src/config.rs @@ -0,0 +1,85 @@ +use num_cpus; +use serde_yaml; +use std::error::Error; +use std::fs::File; +use std::io; +use std::io::prelude::*; +use std::path::Path; + +use super::db::PostgresConnection; +use super::source::{get_sources, Sources}; + +#[derive(Clone, Debug, Serialize)] +pub struct Config { + pub pool_size: u32, + pub keep_alive: usize, + pub worker_processes: usize, + pub listen_addresses: String, + pub sources: Sources, +} + +#[derive(Deserialize)] +pub struct ConfigBuilder { + pub pool_size: Option, + pub keep_alive: Option, + pub worker_processes: Option, + pub listen_addresses: Option, + pub sources: Sources, +} + +impl ConfigBuilder { + pub fn finalize(self) -> Config { + Config { + pool_size: self.pool_size.unwrap_or(20), + keep_alive: self.keep_alive.unwrap_or(75), + worker_processes: self.worker_processes.unwrap_or(num_cpus::get()), + listen_addresses: self.listen_addresses.unwrap_or("0.0.0.0:3000".to_string()), + sources: self.sources, + } + } +} + +pub fn build(config_filename: &str, conn: PostgresConnection) -> io::Result { + if Path::new(config_filename).exists() { + info!("Config found at {}", config_filename); + let config = read_config(config_filename)?; + return Ok(config); + }; + + let sources = get_sources(conn)?; + let config = generate_config(sources); + + // let _ = write_config(config_filename, config.clone()); + + Ok(config) +} + +pub fn generate_config(sources: Sources) -> Config { + let config = ConfigBuilder { + pool_size: None, + keep_alive: None, + worker_processes: None, + listen_addresses: None, + sources: sources, + }; + + config.finalize() +} + +// pub fn write_config(file_name: &str, config: Config) -> io::Result<()> { +// let mut file = File::create(file_name)?; +// let yaml = serde_yaml::to_string(&config).unwrap(); +// file.write_all(yaml.as_bytes())?; +// Ok(()) +// } + +pub fn read_config(file_name: &str) -> io::Result { + let mut file = File::open(file_name)?; + let mut contents = String::new(); + file.read_to_string(&mut contents)?; + + let config: ConfigBuilder = serde_yaml::from_str(contents.as_str()) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err.description()))?; + + Ok(config.finalize()) +} diff --git a/src/db.rs b/src/db.rs index b65ed742..f08b063a 100755 --- a/src/db.rs +++ b/src/db.rs @@ -1,6 +1,6 @@ use actix::prelude::*; -use r2d2_postgres::{PostgresConnectionManager, TlsMode}; use r2d2::{Pool, PooledConnection}; +use r2d2_postgres::{PostgresConnectionManager, TlsMode}; use std::error::Error; use std::io; diff --git a/src/main.rs b/src/main.rs index 4b6bc708..8ef853b1 100755 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ extern crate actix; extern crate actix_web; extern crate env_logger; extern crate futures; +extern crate postgres; #[macro_use] extern crate log; extern crate mapbox_expressions_to_sql; @@ -12,42 +13,31 @@ extern crate serde; #[macro_use] extern crate serde_derive; extern crate serde_json; +extern crate serde_yaml; extern crate tilejson; -use actix::{Actor, Addr, SyncArbiter}; -use actix_web::server; use std::env; use std::error::Error; use std::io; +mod config; mod coordinator_actor; mod db; mod martin; mod messages; +mod server; mod source; mod utils; mod worker_actor; +static CONFIG_FILENAME: &str = "config.yaml"; + fn main() { env_logger::init(); + let pool_size = 20; // TODO: get pool_size from config let conn_string: String = env::var("DATABASE_URL").expect("DATABASE_URL must be set"); - let pool_size = env::var("DATABASE_POOL_SIZE") - .ok() - .and_then(|pool_size| pool_size.parse::().ok()) - .unwrap_or(20); - - let worker_processes = env::var("WORKER_PROCESSES") - .ok() - .and_then(|worker_processes| worker_processes.parse::().ok()) - .unwrap_or(num_cpus::get()); - - let keep_alive = env::var("KEEP_ALIVE") - .ok() - .and_then(|keep_alive| keep_alive.parse::().ok()) - .unwrap_or(75); - info!("Connecting to {}", conn_string); let pool = match db::setup_connection_pool(&conn_string, pool_size) { Ok(pool) => { @@ -60,36 +50,21 @@ fn main() { } }; - let sources = match pool.get() + let config = match pool.get() .map_err(|err| io::Error::new(io::ErrorKind::Other, err.description())) - .and_then(|conn| source::get_sources(conn)) + .and_then(|conn| config::build(CONFIG_FILENAME, conn)) { - Ok(sources) => sources, + Ok(config) => config, Err(error) => { - error!("Can't load sources: {}", error); + error!("Can't build config: {}", error); std::process::exit(-1); } }; - let server = actix::System::new("server"); - let coordinator_addr: Addr<_> = coordinator_actor::CoordinatorActor::default().start(); - let db_sync_arbiter = SyncArbiter::start(3, move || db::DbExecutor(pool.clone())); - - let port = 3000; - let bind_addr = format!("0.0.0.0:{}", port); - let _addr = server::new(move || { - martin::new( - db_sync_arbiter.clone(), - coordinator_addr.clone(), - sources.clone(), - ) - }).bind(bind_addr.clone()) - .expect(&format!("Can't bind to {}", bind_addr)) - .keep_alive(keep_alive) - .shutdown_timeout(0) - .workers(worker_processes) - .start(); + let listen_addresses = config.listen_addresses.clone(); + let server = server::new(config, pool); let _ = server.run(); - info!("Server has been started on {}.", bind_addr); + + info!("Server has been started on {}.", listen_addresses); } diff --git a/src/scripts/get_tile.sql b/src/scripts/get_tile.sql index 99a317de..58dadf5d 100755 --- a/src/scripts/get_tile.sql +++ b/src/scripts/get_tile.sql @@ -1,5 +1,5 @@ WITH bounds AS (SELECT {mercator_bounds} as mercator, {original_bounds} as original) -SELECT ST_AsMVT(tile, '{id}', {extent}, 'geom') FROM ( +SELECT ST_AsMVT(tile, '{id}', {extent}, 'geom' {id_column}) FROM ( SELECT ST_AsMVTGeom({geometry_column_mercator}, bounds.mercator, {extent}, {buffer}, {clip_geom}) AS geom {properties} FROM {id}, bounds diff --git a/src/server.rs b/src/server.rs new file mode 100755 index 00000000..482b18cd --- /dev/null +++ b/src/server.rs @@ -0,0 +1,32 @@ +use actix::{Actor, Addr, SyncArbiter, System, SystemRunner}; +use actix_web::server; + +use super::config::Config; +use super::coordinator_actor; +use super::db; +use super::martin; + +pub fn new(config: Config, pool: db::PostgresPool) -> SystemRunner { + let server = System::new("server"); + let coordinator_addr: Addr<_> = coordinator_actor::CoordinatorActor::default().start(); + let db_sync_arbiter = SyncArbiter::start(3, move || db::DbExecutor(pool.clone())); + + let keep_alive = config.keep_alive; + let worker_processes = config.worker_processes; + let listen_addresses = config.listen_addresses.clone(); + + let _addr = server::new(move || { + martin::new( + db_sync_arbiter.clone(), + coordinator_addr.clone(), + config.sources.clone(), + ) + }).bind(listen_addresses.clone()) + .expect(&format!("Can't bind to {}", listen_addresses)) + .keep_alive(keep_alive) + .shutdown_timeout(0) + .workers(worker_processes) + .start(); + + server +} diff --git a/src/source.rs b/src/source.rs index 869f9b5e..aa0a9203 100755 --- a/src/source.rs +++ b/src/source.rs @@ -5,20 +5,28 @@ use std::io; use super::db::PostgresConnection; use super::utils; -#[derive(Clone, Debug, Serialize)] +static DEFAULT_ID_COLUMN: &str = "id"; +static DEFAULT_EXTENT: u32 = 4096; +static DEFAULT_BUFFER: u32 = 64; +static DEFAULT_CLIP_GEOM: bool = true; + +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Source { pub id: String, pub schema: String, pub table: String, + pub id_column: Option, pub geometry_column: String, pub srid: u32, - pub extent: u32, - pub buffer: u32, - pub clip_geom: bool, - pub geometry_type: String, + pub extent: Option, + pub buffer: Option, + pub clip_geom: Option, + pub geometry_type: Option, pub properties: HashMap, } +pub type Sources = HashMap; + pub type Tile = Vec; impl Source { @@ -53,16 +61,21 @@ impl Source { format!(", {0}", properties) }; + let id_column = self.id_column + .clone() + .map_or("".to_string(), |id_column| format!(", '{}'", id_column)); + let query = format!( include_str!("scripts/get_tile.sql"), id = self.id, + id_column = id_column, geometry_column = self.geometry_column, geometry_column_mercator = geometry_column_mercator, original_bounds = original_bounds, mercator_bounds = mercator_bounds, - extent = self.extent, - buffer = self.buffer, - clip_geom = self.clip_geom, + extent = self.extent.unwrap_or(DEFAULT_EXTENT), + buffer = self.buffer.unwrap_or(DEFAULT_BUFFER), + clip_geom = self.clip_geom.unwrap_or(DEFAULT_CLIP_GEOM), properties = properties, condition = condition.map_or("".to_string(), |condition| format!("AND {}", condition)), ); @@ -75,13 +88,7 @@ impl Source { } } -pub type Sources = HashMap; - pub fn get_sources(conn: PostgresConnection) -> Result { - let default_extent = 4096; - let default_buffer = 64; - let default_clip_geom = true; - let mut sources = HashMap::new(); let rows = conn.query(include_str!("scripts/get_sources.sql"), &[]) .map_err(|err| io::Error::new(io::ErrorKind::Other, err.description()))?; @@ -99,17 +106,26 @@ pub fn get_sources(conn: PostgresConnection) -> Result { continue; } + let properties = utils::json_to_hashmap(row.get("properties")); + + let id_column = if properties.contains_key(DEFAULT_ID_COLUMN) { + Some(DEFAULT_ID_COLUMN.to_string()) + } else { + None + }; + let source = Source { id: id.to_string(), schema: schema, table: table, + id_column: id_column, geometry_column: geometry_column, srid: srid as u32, - extent: default_extent, - buffer: default_buffer, - clip_geom: default_clip_geom, + extent: Some(DEFAULT_EXTENT), + buffer: Some(DEFAULT_BUFFER), + clip_geom: Some(DEFAULT_CLIP_GEOM), geometry_type: row.get("type"), - properties: utils::json_to_hashmap(row.get("properties")), + properties: properties, }; sources.insert(id, source);