feat: add config support

This commit is contained in:
Stepan Kuzmin 2018-07-25 15:02:31 +03:00
parent 5532520f42
commit c55e61d27f
9 changed files with 199 additions and 60 deletions

2
.gitignore vendored
View File

@ -1,4 +1,6 @@
/target/
**/*.rs.bk
db
config.yaml
docker-compose.yml

28
Cargo.lock generated
View File

@ -706,6 +706,11 @@ name = "linked-hash-map"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "linked-hash-map"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "lock_api"
version = "0.1.3"
@ -757,6 +762,7 @@ dependencies = [
"serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.24 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_yaml 0.7.5 (registry+https://github.com/rust-lang/crates.io-index)",
"tilejson 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -1207,6 +1213,17 @@ dependencies = [
"serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "serde_yaml"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"dtoa 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
"linked-hash-map 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)",
"yaml-rust 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "sha1"
version = "0.6.0"
@ -1697,6 +1714,14 @@ dependencies = [
"winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "yaml-rust"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"linked-hash-map 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[metadata]
"checksum actix 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "28b4c697e6e63280f874c1d3e30c476de1cc589efd91b32b748a17808cb4adc3"
"checksum actix-web 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1bf2bc88dea9c852b9e2fc1c509a422907e05e83d06bef510e8f493b3396f01a"
@ -1779,6 +1804,7 @@ dependencies = [
"checksum lazycell 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d33a48d0365c96081958cc663eef834975cb1e8d8bea3378513fc72bdbf11e50"
"checksum libc 0.2.42 (registry+https://github.com/rust-lang/crates.io-index)" = "b685088df2b950fccadf07a7187c8ef846a959c142338a48f9dc0b94517eb5f1"
"checksum linked-hash-map 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7860ec297f7008ff7a1e3382d7f7e1dcd69efc94751a2284bafc3d013c2aa939"
"checksum linked-hash-map 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "70fb39025bc7cdd76305867c4eccf2f2dcf6e9a57f5b21a93e1c2d86cd03ec9e"
"checksum lock_api 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "949826a5ccf18c1b3a7c3d57692778d21768b79e46eb9dd07bfc4c2160036c54"
"checksum log 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "61bd98ae7f7b754bc53dca7d44b604f733c6bba044ea6f41bc8d89272d8161d2"
"checksum lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4d06ff7ff06f729ce5f4e227876cb88d10bc59cd4ae1e09fbb2bde15c850dc21"
@ -1834,6 +1860,7 @@ dependencies = [
"checksum serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)" = "0c3adf19c07af6d186d91dae8927b83b0553d07ca56cbf7f2f32560455c91920"
"checksum serde_derive 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)" = "3525a779832b08693031b8ecfb0de81cd71cfd3812088fafe9a7496789572124"
"checksum serde_json 1.0.24 (registry+https://github.com/rust-lang/crates.io-index)" = "c3c6908c7b925cd6c590358a4034de93dbddb20c45e1d021931459fd419bf0e2"
"checksum serde_yaml 0.7.5 (registry+https://github.com/rust-lang/crates.io-index)" = "ef8099d3df28273c99a1728190c7a9f19d444c941044f64adf986bee7ec53051"
"checksum sha1 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d"
"checksum sha2 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9eb6be24e4c23a84d7184280d2722f7f2731fcdd4a9d886efbfe4413e4847ea0"
"checksum siphasher 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0df90a788073e8d0235a67e50441d47db7c8ad9debd91cbf43736a2a92d36537"
@ -1891,3 +1918,4 @@ dependencies = [
"checksum winreg 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a27a759395c1195c4cc5cda607ef6f8f6498f64e78f7900f5de0a127a424704a"
"checksum winutil 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7daf138b6b14196e3830a588acf1e86966c694d3e8fb026fb105b8b5dca07e6e"
"checksum ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e"
"checksum yaml-rust 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "57ab38ee1a4a266ed033496cf9af1828d8d6e6c1cfa5f643a2809effcae4d628"

View File

@ -17,4 +17,5 @@ r2d2_postgres = "0.14"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
serde_yaml = "0.7"
tilejson = "0.1"

85
src/config.rs Executable file
View File

@ -0,0 +1,85 @@
use num_cpus;
use serde_yaml;
use std::error::Error;
use std::fs::File;
use std::io;
use std::io::prelude::*;
use std::path::Path;
use super::db::PostgresConnection;
use super::source::{get_sources, Sources};
#[derive(Clone, Debug, Serialize)]
pub struct Config {
pub pool_size: u32,
pub keep_alive: usize,
pub worker_processes: usize,
pub listen_addresses: String,
pub sources: Sources,
}
#[derive(Deserialize)]
pub struct ConfigBuilder {
pub pool_size: Option<u32>,
pub keep_alive: Option<usize>,
pub worker_processes: Option<usize>,
pub listen_addresses: Option<String>,
pub sources: Sources,
}
impl ConfigBuilder {
pub fn finalize(self) -> Config {
Config {
pool_size: self.pool_size.unwrap_or(20),
keep_alive: self.keep_alive.unwrap_or(75),
worker_processes: self.worker_processes.unwrap_or(num_cpus::get()),
listen_addresses: self.listen_addresses.unwrap_or("0.0.0.0:3000".to_string()),
sources: self.sources,
}
}
}
pub fn build(config_filename: &str, conn: PostgresConnection) -> io::Result<Config> {
if Path::new(config_filename).exists() {
info!("Config found at {}", config_filename);
let config = read_config(config_filename)?;
return Ok(config);
};
let sources = get_sources(conn)?;
let config = generate_config(sources);
// let _ = write_config(config_filename, config.clone());
Ok(config)
}
pub fn generate_config(sources: Sources) -> Config {
let config = ConfigBuilder {
pool_size: None,
keep_alive: None,
worker_processes: None,
listen_addresses: None,
sources: sources,
};
config.finalize()
}
// pub fn write_config(file_name: &str, config: Config) -> io::Result<()> {
// let mut file = File::create(file_name)?;
// let yaml = serde_yaml::to_string(&config).unwrap();
// file.write_all(yaml.as_bytes())?;
// Ok(())
// }
pub fn read_config(file_name: &str) -> io::Result<Config> {
let mut file = File::open(file_name)?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
let config: ConfigBuilder = serde_yaml::from_str(contents.as_str())
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.description()))?;
Ok(config.finalize())
}

View File

@ -1,6 +1,6 @@
use actix::prelude::*;
use r2d2_postgres::{PostgresConnectionManager, TlsMode};
use r2d2::{Pool, PooledConnection};
use r2d2_postgres::{PostgresConnectionManager, TlsMode};
use std::error::Error;
use std::io;

View File

@ -2,6 +2,7 @@ extern crate actix;
extern crate actix_web;
extern crate env_logger;
extern crate futures;
extern crate postgres;
#[macro_use]
extern crate log;
extern crate mapbox_expressions_to_sql;
@ -12,42 +13,31 @@ extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate serde_json;
extern crate serde_yaml;
extern crate tilejson;
use actix::{Actor, Addr, SyncArbiter};
use actix_web::server;
use std::env;
use std::error::Error;
use std::io;
mod config;
mod coordinator_actor;
mod db;
mod martin;
mod messages;
mod server;
mod source;
mod utils;
mod worker_actor;
static CONFIG_FILENAME: &str = "config.yaml";
fn main() {
env_logger::init();
let pool_size = 20; // TODO: get pool_size from config
let conn_string: String = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
let pool_size = env::var("DATABASE_POOL_SIZE")
.ok()
.and_then(|pool_size| pool_size.parse::<u32>().ok())
.unwrap_or(20);
let worker_processes = env::var("WORKER_PROCESSES")
.ok()
.and_then(|worker_processes| worker_processes.parse::<usize>().ok())
.unwrap_or(num_cpus::get());
let keep_alive = env::var("KEEP_ALIVE")
.ok()
.and_then(|keep_alive| keep_alive.parse::<usize>().ok())
.unwrap_or(75);
info!("Connecting to {}", conn_string);
let pool = match db::setup_connection_pool(&conn_string, pool_size) {
Ok(pool) => {
@ -60,36 +50,21 @@ fn main() {
}
};
let sources = match pool.get()
let config = match pool.get()
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.description()))
.and_then(|conn| source::get_sources(conn))
.and_then(|conn| config::build(CONFIG_FILENAME, conn))
{
Ok(sources) => sources,
Ok(config) => config,
Err(error) => {
error!("Can't load sources: {}", error);
error!("Can't build config: {}", error);
std::process::exit(-1);
}
};
let server = actix::System::new("server");
let coordinator_addr: Addr<_> = coordinator_actor::CoordinatorActor::default().start();
let db_sync_arbiter = SyncArbiter::start(3, move || db::DbExecutor(pool.clone()));
let port = 3000;
let bind_addr = format!("0.0.0.0:{}", port);
let _addr = server::new(move || {
martin::new(
db_sync_arbiter.clone(),
coordinator_addr.clone(),
sources.clone(),
)
}).bind(bind_addr.clone())
.expect(&format!("Can't bind to {}", bind_addr))
.keep_alive(keep_alive)
.shutdown_timeout(0)
.workers(worker_processes)
.start();
let listen_addresses = config.listen_addresses.clone();
let server = server::new(config, pool);
let _ = server.run();
info!("Server has been started on {}.", bind_addr);
info!("Server has been started on {}.", listen_addresses);
}

View File

@ -1,5 +1,5 @@
WITH bounds AS (SELECT {mercator_bounds} as mercator, {original_bounds} as original)
SELECT ST_AsMVT(tile, '{id}', {extent}, 'geom') FROM (
SELECT ST_AsMVT(tile, '{id}', {extent}, 'geom' {id_column}) FROM (
SELECT
ST_AsMVTGeom({geometry_column_mercator}, bounds.mercator, {extent}, {buffer}, {clip_geom}) AS geom {properties}
FROM {id}, bounds

32
src/server.rs Executable file
View File

@ -0,0 +1,32 @@
use actix::{Actor, Addr, SyncArbiter, System, SystemRunner};
use actix_web::server;
use super::config::Config;
use super::coordinator_actor;
use super::db;
use super::martin;
pub fn new(config: Config, pool: db::PostgresPool) -> SystemRunner {
let server = System::new("server");
let coordinator_addr: Addr<_> = coordinator_actor::CoordinatorActor::default().start();
let db_sync_arbiter = SyncArbiter::start(3, move || db::DbExecutor(pool.clone()));
let keep_alive = config.keep_alive;
let worker_processes = config.worker_processes;
let listen_addresses = config.listen_addresses.clone();
let _addr = server::new(move || {
martin::new(
db_sync_arbiter.clone(),
coordinator_addr.clone(),
config.sources.clone(),
)
}).bind(listen_addresses.clone())
.expect(&format!("Can't bind to {}", listen_addresses))
.keep_alive(keep_alive)
.shutdown_timeout(0)
.workers(worker_processes)
.start();
server
}

View File

@ -5,20 +5,28 @@ use std::io;
use super::db::PostgresConnection;
use super::utils;
#[derive(Clone, Debug, Serialize)]
static DEFAULT_ID_COLUMN: &str = "id";
static DEFAULT_EXTENT: u32 = 4096;
static DEFAULT_BUFFER: u32 = 64;
static DEFAULT_CLIP_GEOM: bool = true;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Source {
pub id: String,
pub schema: String,
pub table: String,
pub id_column: Option<String>,
pub geometry_column: String,
pub srid: u32,
pub extent: u32,
pub buffer: u32,
pub clip_geom: bool,
pub geometry_type: String,
pub extent: Option<u32>,
pub buffer: Option<u32>,
pub clip_geom: Option<bool>,
pub geometry_type: Option<String>,
pub properties: HashMap<String, String>,
}
pub type Sources = HashMap<String, Source>;
pub type Tile = Vec<u8>;
impl Source {
@ -53,16 +61,21 @@ impl Source {
format!(", {0}", properties)
};
let id_column = self.id_column
.clone()
.map_or("".to_string(), |id_column| format!(", '{}'", id_column));
let query = format!(
include_str!("scripts/get_tile.sql"),
id = self.id,
id_column = id_column,
geometry_column = self.geometry_column,
geometry_column_mercator = geometry_column_mercator,
original_bounds = original_bounds,
mercator_bounds = mercator_bounds,
extent = self.extent,
buffer = self.buffer,
clip_geom = self.clip_geom,
extent = self.extent.unwrap_or(DEFAULT_EXTENT),
buffer = self.buffer.unwrap_or(DEFAULT_BUFFER),
clip_geom = self.clip_geom.unwrap_or(DEFAULT_CLIP_GEOM),
properties = properties,
condition = condition.map_or("".to_string(), |condition| format!("AND {}", condition)),
);
@ -75,13 +88,7 @@ impl Source {
}
}
pub type Sources = HashMap<String, Source>;
pub fn get_sources(conn: PostgresConnection) -> Result<Sources, io::Error> {
let default_extent = 4096;
let default_buffer = 64;
let default_clip_geom = true;
let mut sources = HashMap::new();
let rows = conn.query(include_str!("scripts/get_sources.sql"), &[])
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.description()))?;
@ -99,17 +106,26 @@ pub fn get_sources(conn: PostgresConnection) -> Result<Sources, io::Error> {
continue;
}
let properties = utils::json_to_hashmap(row.get("properties"));
let id_column = if properties.contains_key(DEFAULT_ID_COLUMN) {
Some(DEFAULT_ID_COLUMN.to_string())
} else {
None
};
let source = Source {
id: id.to_string(),
schema: schema,
table: table,
id_column: id_column,
geometry_column: geometry_column,
srid: srid as u32,
extent: default_extent,
buffer: default_buffer,
clip_geom: default_clip_geom,
extent: Some(DEFAULT_EXTENT),
buffer: Some(DEFAULT_BUFFER),
clip_geom: Some(DEFAULT_CLIP_GEOM),
geometry_type: row.get("type"),
properties: utils::json_to_hashmap(row.get("properties")),
properties: properties,
};
sources.insert(id, source);