feat: split sources into table_sources and function_sources

This commit is contained in:
Stepan Kuzmin 2018-08-08 15:08:43 +03:00
parent 21f12e24fa
commit 3c3d88b184
11 changed files with 377 additions and 311 deletions

57
src/config.rs Executable file → Normal file
View File

@ -7,7 +7,8 @@ use std::io::prelude::*;
use std::path::Path;
use super::db::PostgresConnection;
use super::source::{get_sources, Sources};
use super::function_source::FunctionSources;
use super::table_source::{get_table_sources, TableSources};
#[derive(Clone, Debug, Serialize)]
pub struct Config {
@ -15,16 +16,18 @@ pub struct Config {
pub keep_alive: usize,
pub worker_processes: usize,
pub listen_addresses: String,
pub sources: Sources,
pub table_sources: Option<TableSources>,
pub function_sources: Option<FunctionSources>,
}
#[derive(Deserialize)]
pub struct ConfigBuilder {
struct ConfigBuilder {
pub pool_size: Option<u32>,
pub keep_alive: Option<usize>,
pub worker_processes: Option<usize>,
pub listen_addresses: Option<String>,
pub sources: Sources,
pub table_sources: Option<TableSources>,
pub function_sources: Option<FunctionSources>,
}
impl ConfigBuilder {
@ -32,35 +35,33 @@ impl ConfigBuilder {
Config {
pool_size: self.pool_size.unwrap_or(20),
keep_alive: self.keep_alive.unwrap_or(75),
worker_processes: self.worker_processes.unwrap_or(num_cpus::get()),
listen_addresses: self.listen_addresses.unwrap_or("0.0.0.0:3000".to_string()),
sources: self.sources,
worker_processes: self.worker_processes.unwrap_or_else(num_cpus::get),
listen_addresses: self.listen_addresses.unwrap_or("0.0.0.0:3000".to_owned()),
table_sources: self.table_sources,
function_sources: self.function_sources,
}
}
}
pub fn build(config_filename: &str, conn: PostgresConnection) -> io::Result<Config> {
if Path::new(config_filename).exists() {
info!("Config found at {}", config_filename);
let config = read_config(config_filename)?;
return Ok(config);
};
pub fn read_config(file_name: &str) -> io::Result<Config> {
let mut file = File::open(file_name)?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
let sources = get_sources(conn)?;
let config = generate_config(sources);
let config_builder: ConfigBuilder = serde_yaml::from_str(contents.as_str())
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.description()))?;
// let _ = write_config(config_filename, config.clone());
Ok(config)
Ok(config_builder.finalize())
}
pub fn generate_config(sources: Sources) -> Config {
fn generate_config(table_sources: TableSources) -> Config {
let config = ConfigBuilder {
pool_size: None,
keep_alive: None,
worker_processes: None,
listen_addresses: None,
sources: sources,
table_sources: Some(table_sources),
function_sources: None,
};
config.finalize()
@ -73,13 +74,15 @@ pub fn generate_config(sources: Sources) -> Config {
// Ok(())
// }
pub fn read_config(file_name: &str) -> io::Result<Config> {
let mut file = File::open(file_name)?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
pub fn build_config(config_filename: &str, conn: PostgresConnection) -> io::Result<Config> {
if Path::new(config_filename).exists() {
info!("Config found at {}", config_filename);
let config = read_config(config_filename)?;
return Ok(config);
};
let config: ConfigBuilder = serde_yaml::from_str(contents.as_str())
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.description()))?;
let table_sources = get_table_sources(conn)?;
let config = generate_config(table_sources);
Ok(config.finalize())
Ok(config)
}

View File

@ -1,11 +1,6 @@
use actix::prelude::*;
use r2d2::{Pool, PooledConnection};
use r2d2_postgres::{PostgresConnectionManager, TlsMode};
use std::error::Error;
use std::io;
use super::messages;
use super::source::{get_sources, Sources, Tile};
pub type PostgresPool = Pool<PostgresConnectionManager>;
pub type PostgresConnection = PooledConnection<PostgresConnectionManager>;
@ -15,33 +10,3 @@ pub fn setup_connection_pool(cn_str: &str, pool_size: u32) -> Result<PostgresPoo
let pool = Pool::builder().max_size(pool_size).build(manager)?;
Ok(pool)
}
#[derive(Debug)]
pub struct DbExecutor(pub PostgresPool);
impl Actor for DbExecutor {
type Context = SyncContext<Self>;
}
impl Handler<messages::GetSources> for DbExecutor {
type Result = Result<Sources, io::Error>;
fn handle(&mut self, _msg: messages::GetSources, _: &mut Self::Context) -> Self::Result {
let conn = self.0.get().unwrap();
let sources = get_sources(conn)?;
Ok(sources)
}
}
impl Handler<messages::GetTile> for DbExecutor {
type Result = Result<Tile, io::Error>;
fn handle(&mut self, msg: messages::GetTile, _: &mut Self::Context) -> Self::Result {
let conn = self.0.get().unwrap();
let tile = msg.source
.get_tile(conn, msg.z, msg.x, msg.y, msg.condition)?;
Ok(tile)
}
}

24
src/db_executor.rs Normal file
View File

@ -0,0 +1,24 @@
use actix::prelude::*;
use std::io;
use super::db::PostgresPool;
use super::messages;
use super::source::Tile;
pub struct DbExecutor(pub PostgresPool);
impl Actor for DbExecutor {
type Context = SyncContext<Self>;
}
impl Handler<messages::GetTile> for DbExecutor {
type Result = Result<Tile, io::Error>;
fn handle(&mut self, msg: messages::GetTile, _: &mut Self::Context) -> Self::Result {
let conn = self.0.get().unwrap();
let tile = msg.source.get_tile(conn, msg.xyz, msg.query)?;
Ok(tile)
}
}

28
src/function_source.rs Normal file
View File

@ -0,0 +1,28 @@
use std::collections::HashMap;
use std::io;
use super::db::PostgresConnection;
use super::martin::Query;
use super::source::{Source, Tile, XYZ};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct FunctionSource {
id: String,
}
impl Source for FunctionSource {
fn get_id(self) -> String {
self.id
}
fn get_tile(
&self,
_conn: PostgresConnection,
_xyz: XYZ,
_query: Query,
) -> Result<Tile, io::Error> {
Ok(Vec::new())
}
}
pub type FunctionSources = HashMap<String, Box<FunctionSource>>;

37
src/main.rs Executable file → Normal file
View File

@ -1,12 +1,15 @@
// #![warn(clippy)]
extern crate actix;
extern crate actix_web;
extern crate env_logger;
extern crate futures;
extern crate postgres;
extern crate mapbox_expressions_to_sql;
extern crate tilejson;
#[macro_use]
extern crate log;
extern crate mapbox_expressions_to_sql;
extern crate num_cpus;
extern crate postgres;
extern crate r2d2;
extern crate r2d2_postgres;
extern crate serde;
@ -14,21 +17,26 @@ extern crate serde;
extern crate serde_derive;
extern crate serde_json;
extern crate serde_yaml;
extern crate tilejson;
mod config;
mod db;
mod db_executor;
mod function_source;
mod martin;
mod messages;
mod server;
mod source;
mod table_source;
mod utils;
use std::env;
use std::error::Error;
use std::io;
mod config;
mod db;
mod martin;
mod messages;
mod server;
mod source;
mod utils;
use config::build_config;
use db::setup_connection_pool;
static CONFIG_FILENAME: &str = "config.yaml";
static DEFAULT_CONFIG_FILENAME: &str = "config.yaml";
fn main() {
env_logger::init();
@ -37,7 +45,7 @@ fn main() {
let conn_string: String = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
info!("Connecting to {}", conn_string);
let pool = match db::setup_connection_pool(&conn_string, pool_size) {
let pool = match setup_connection_pool(&conn_string, pool_size) {
Ok(pool) => {
info!("Connected to postgres: {}", conn_string);
pool
@ -48,9 +56,10 @@ fn main() {
}
};
let config = match pool.get()
let config = match pool
.get()
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.description()))
.and_then(|conn| config::build(CONFIG_FILENAME, conn))
.and_then(|conn| build_config(DEFAULT_CONFIG_FILENAME, conn))
{
Ok(config) => config,
Err(error) => {

View File

@ -1,35 +1,44 @@
use actix::*;
use actix_web::*;
use futures::future::Future;
use mapbox_expressions_to_sql;
use std::collections::HashMap;
use tilejson::TileJSONBuilder;
use super::db::DbExecutor;
use super::config::Config;
use super::db_executor::DbExecutor;
// use super::function_source::FunctionSources;
use super::messages;
use super::source::Sources;
use super::table_source::TableSources;
use super::utils::parse_xyz;
pub type Query = HashMap<String, String>;
pub struct State {
db: Addr<DbExecutor>,
sources: Sources,
table_sources: Option<TableSources>,
// function_sources: Option<FunctionSources>,
}
// TODO: Swagger endpoint
fn index(req: &HttpRequest<State>) -> Result<HttpResponse> {
let sources = &req.state().sources;
let table_sources = &req.state().table_sources.clone().unwrap();
Ok(HttpResponse::Ok()
.header("Access-Control-Allow-Origin", "*")
.json(sources))
.json(table_sources))
}
fn source(req: &HttpRequest<State>) -> Result<HttpResponse> {
let source_ids = req.match_info()
.get("sources")
let source_id = req
.match_info()
.get("source_id")
.ok_or(error::ErrorBadRequest("invalid source"))?;
let path = req.headers()
let path = req
.headers()
.get("x-rewrite-url")
.map_or(String::from(source_ids), |header| {
let parts: Vec<&str> = header.to_str().unwrap().split(".").collect();
.map_or(String::from(source_id), |header| {
let parts: Vec<&str> = header.to_str().unwrap().split('.').collect();
let (_, parts_without_extension) = parts.split_last().unwrap();
let path_without_extension = parts_without_extension.join(".");
let (_, path_without_leading_slash) = path_without_extension.split_at(1);
@ -47,7 +56,7 @@ fn source(req: &HttpRequest<State>) -> Result<HttpResponse> {
let mut tilejson_builder = TileJSONBuilder::new();
tilejson_builder.scheme("tms");
tilejson_builder.name(&source_ids);
tilejson_builder.name(&source_id);
tilejson_builder.tiles(vec![&tiles_url]);
let tilejson = tilejson_builder.finalize();
@ -57,10 +66,12 @@ fn source(req: &HttpRequest<State>) -> Result<HttpResponse> {
}
fn tile(req: &HttpRequest<State>) -> Result<Box<Future<Item = HttpResponse, Error = Error>>> {
let sources = &req.state().sources;
let sources = &req.state().table_sources.clone().unwrap();
let params = req.match_info();
let query = req.query();
let source_id = req.match_info()
.get("sources")
let source_id = params
.get("source_id")
.ok_or(error::ErrorBadRequest("invalid source"))?;
let source = sources.get(source_id).ok_or(error::ErrorNotFound(format!(
@ -68,33 +79,16 @@ fn tile(req: &HttpRequest<State>) -> Result<Box<Future<Item = HttpResponse, Erro
source_id
)))?;
let z = req.match_info()
.get("z")
.and_then(|i| i.parse::<u32>().ok())
.ok_or(error::ErrorBadRequest("invalid z"))?;
let xyz = parse_xyz(params)
.map_err(|e| error::ErrorBadRequest(format!("Can't parse XYZ scheme: {}", e)))?;
let x = req.match_info()
.get("x")
.and_then(|i| i.parse::<u32>().ok())
.ok_or(error::ErrorBadRequest("invalid x"))?;
let y = req.match_info()
.get("y")
.and_then(|i| i.parse::<u32>().ok())
.ok_or(error::ErrorBadRequest("invalid y"))?;
let condition = req.query()
.get("filter")
.and_then(|filter| mapbox_expressions_to_sql::parse(filter).ok());
Ok(req.state()
Ok(req
.state()
.db
.send(messages::GetTile {
z: z,
x: x,
y: y,
xyz: xyz,
query: query.clone(),
source: source.clone(),
condition: condition,
})
.from_err()
.and_then(|res| match res {
@ -113,17 +107,20 @@ fn tile(req: &HttpRequest<State>) -> Result<Box<Future<Item = HttpResponse, Erro
.responder())
}
pub fn new(db_sync_arbiter: Addr<DbExecutor>, sources: Sources) -> App<State> {
pub fn new(db_sync_arbiter: Addr<DbExecutor>, config: Config) -> App<State> {
let state = State {
db: db_sync_arbiter,
sources: sources,
table_sources: config.table_sources,
// function_sources: config.function_sources,
};
App::with_state(state)
.middleware(middleware::Logger::default())
.resource("/index.json", |r| r.method(http::Method::GET).f(index))
.resource("/{sources}.json", |r| r.method(http::Method::GET).f(source))
.resource("/{sources}/{z}/{x}/{y}.pbf", |r| {
.resource("/{source_id}.json", |r| {
r.method(http::Method::GET).f(source)
})
.resource("/{source_id}/{z}/{x}/{y}.pbf", |r| {
r.method(http::Method::GET).f(tile)
})
}

View File

@ -1,20 +1,13 @@
use actix::prelude::*;
use std::io;
use super::source::{Source, Sources, Tile};
pub struct GetSources {}
impl Message for GetSources {
type Result = Result<Sources, io::Error>;
}
use super::martin::Query;
use super::source::{Source, Tile, XYZ};
pub struct GetTile {
pub z: u32,
pub x: u32,
pub y: u32,
pub source: Source,
pub condition: Option<String>,
pub xyz: XYZ,
pub query: Query,
pub source: Box<dyn Source + Send>,
}
impl Message for GetTile {

View File

@ -2,18 +2,19 @@ use actix::{SyncArbiter, System, SystemRunner};
use actix_web::server;
use super::config::Config;
use super::db;
use super::db::PostgresPool;
use super::db_executor::DbExecutor;
use super::martin;
pub fn new(config: Config, pool: db::PostgresPool) -> SystemRunner {
pub fn new(config: Config, pool: PostgresPool) -> SystemRunner {
let server = System::new("server");
let db_sync_arbiter = SyncArbiter::start(3, move || db::DbExecutor(pool.clone()));
let db_sync_arbiter = SyncArbiter::start(3, move || DbExecutor(pool.clone()));
let keep_alive = config.keep_alive;
let worker_processes = config.worker_processes;
let listen_addresses = config.listen_addresses.clone();
let _addr = server::new(move || martin::new(db_sync_arbiter.clone(), config.sources.clone()))
let _addr = server::new(move || martin::new(db_sync_arbiter.clone(), config.clone()))
.bind(listen_addresses.clone())
.expect(&format!("Can't bind to {}", listen_addresses))
.keep_alive(keep_alive)

139
src/source.rs Executable file → Normal file
View File

@ -1,136 +1,21 @@
use std::collections::HashMap;
use std::error::Error;
// use std::collections::HashMap;
use std::fmt::Debug;
use std::io;
use super::db::PostgresConnection;
use super::utils;
// static DEFAULT_ID_COLUMN: &str = "id";
static DEFAULT_EXTENT: u32 = 4096;
static DEFAULT_BUFFER: u32 = 64;
static DEFAULT_CLIP_GEOM: bool = true;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Source {
pub id: String,
pub schema: String,
pub table: String,
pub id_column: Option<String>,
pub geometry_column: String,
pub srid: u32,
pub extent: Option<u32>,
pub buffer: Option<u32>,
pub clip_geom: Option<bool>,
pub geometry_type: Option<String>,
pub properties: HashMap<String, String>,
}
pub type Sources = HashMap<String, Source>;
use super::martin::Query;
pub type Tile = Vec<u8>;
impl Source {
pub fn get_tile(
&self,
conn: PostgresConnection,
z: u32,
x: u32,
y: u32,
condition: Option<String>,
) -> Result<Tile, io::Error> {
let mercator_bounds = utils::tilebbox(z, x, y);
let (geometry_column_mercator, original_bounds) = if self.srid == 3857 {
(self.geometry_column.clone(), mercator_bounds.clone())
} else {
(
format!("ST_Transform({0}, 3857)", self.geometry_column),
format!("ST_Transform({0}, {1})", mercator_bounds, self.srid),
)
};
let properties = if self.properties.is_empty() {
"".to_string()
} else {
let properties = self.properties
.keys()
.map(|column| format!("\"{0}\"", column))
.collect::<Vec<String>>()
.join(",");
format!(", {0}", properties)
};
let id_column = self.id_column
.clone()
.map_or("".to_string(), |id_column| format!(", '{}'", id_column));
let query = format!(
include_str!("scripts/get_tile.sql"),
id = self.id,
id_column = id_column,
geometry_column = self.geometry_column,
geometry_column_mercator = geometry_column_mercator,
original_bounds = original_bounds,
mercator_bounds = mercator_bounds,
extent = self.extent.unwrap_or(DEFAULT_EXTENT),
buffer = self.buffer.unwrap_or(DEFAULT_BUFFER),
clip_geom = self.clip_geom.unwrap_or(DEFAULT_CLIP_GEOM),
properties = properties,
condition = condition.map_or("".to_string(), |condition| format!("AND {}", condition)),
);
let tile: Tile = conn.query(&query, &[])
.map(|rows| rows.get(0).get("st_asmvt"))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.description()))?;
Ok(tile)
}
pub struct XYZ {
pub z: u32,
pub x: u32,
pub y: u32,
}
pub fn get_sources(conn: PostgresConnection) -> Result<Sources, io::Error> {
let mut sources = HashMap::new();
let rows = conn.query(include_str!("scripts/get_sources.sql"), &[])
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.description()))?;
for row in &rows {
let schema: String = row.get("f_table_schema");
let table: String = row.get("f_table_name");
let id = format!("{}.{}", schema, table);
let geometry_column: String = row.get("f_geometry_column");
let srid: i32 = row.get("srid");
if srid == 0 {
warn!("{} has SRID 0, skipping", id);
continue;
}
let properties = utils::json_to_hashmap(row.get("properties"));
let id_column = None;
// let id_column = if properties.contains_key(DEFAULT_ID_COLUMN) {
// Some(DEFAULT_ID_COLUMN.to_string())
// } else {
// None
// };
let source = Source {
id: id.to_string(),
schema: schema,
table: table,
id_column: id_column,
geometry_column: geometry_column,
srid: srid as u32,
extent: Some(DEFAULT_EXTENT),
buffer: Some(DEFAULT_BUFFER),
clip_geom: Some(DEFAULT_CLIP_GEOM),
geometry_type: row.get("type"),
properties: properties,
};
sources.insert(id, source);
}
Ok(sources)
pub trait Source: Debug {
fn get_id(self) -> String;
fn get_tile(&self, conn: PostgresConnection, xyz: XYZ, query: Query) -> Result<Tile, io::Error>;
}
// pub type Sources = HashMap<String, Box<dyn Source>>;

135
src/table_source.rs Normal file
View File

@ -0,0 +1,135 @@
use mapbox_expressions_to_sql;
use std::collections::HashMap;
use std::error::Error;
use std::io;
use super::db::PostgresConnection;
use super::martin::Query;
use super::source::{Source, Tile, XYZ};
use super::utils;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TableSource {
pub id: String,
pub schema: String,
pub table: String,
pub id_column: Option<String>,
pub geometry_column: String,
pub srid: u32,
pub extent: Option<u32>,
pub buffer: Option<u32>,
pub clip_geom: Option<bool>,
pub geometry_type: Option<String>,
pub properties: HashMap<String, String>,
}
pub type TableSources = HashMap<String, Box<TableSource>>;
impl Source for TableSource {
fn get_id(self) -> String {
self.id
}
fn get_tile(&self, conn: PostgresConnection, xyz: XYZ, query: Query) -> Result<Tile, io::Error> {
let mercator_bounds = utils::tilebbox(xyz);
let (geometry_column_mercator, original_bounds) = if self.srid == 3857 {
(self.geometry_column.clone(), mercator_bounds.clone())
} else {
(
format!("ST_Transform({0}, 3857)", self.geometry_column),
format!("ST_Transform({0}, {1})", mercator_bounds, self.srid),
)
};
let properties = if self.properties.is_empty() {
"".to_string()
} else {
let properties = self
.properties
.keys()
.map(|column| format!("\"{0}\"", column))
.collect::<Vec<String>>()
.join(",");
format!(", {0}", properties)
};
let id_column = self
.id_column
.clone()
.map_or("".to_string(), |id_column| format!(", '{}'", id_column));
let condition = query
.get("filter")
.and_then(|filter| mapbox_expressions_to_sql::parse(filter).ok());
let query = format!(
include_str!("scripts/get_tile.sql"),
id = self.id,
id_column = id_column,
geometry_column = self.geometry_column,
geometry_column_mercator = geometry_column_mercator,
original_bounds = original_bounds,
mercator_bounds = mercator_bounds,
extent = self.extent.unwrap_or(DEFAULT_EXTENT),
buffer = self.buffer.unwrap_or(DEFAULT_BUFFER),
clip_geom = self.clip_geom.unwrap_or(DEFAULT_CLIP_GEOM),
properties = properties,
condition = condition.map_or("".to_string(), |condition| format!("AND {}", condition)),
);
let tile: Tile = conn
.query(&query, &[])
.map(|rows| rows.get(0).get("st_asmvt"))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.description()))?;
Ok(tile)
}
}
static DEFAULT_EXTENT: u32 = 4096;
static DEFAULT_BUFFER: u32 = 64;
static DEFAULT_CLIP_GEOM: bool = true;
pub fn get_table_sources(conn: PostgresConnection) -> Result<TableSources, io::Error> {
let mut sources = HashMap::new();
let rows = conn
.query(include_str!("scripts/get_sources.sql"), &[])
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.description()))?;
for row in &rows {
let schema: String = row.get("f_table_schema");
let table: String = row.get("f_table_name");
let id = format!("{}.{}", schema, table);
let geometry_column: String = row.get("f_geometry_column");
let srid: i32 = row.get("srid");
if srid == 0 {
warn!("{} has SRID 0, skipping", id);
continue;
}
let properties = utils::json_to_hashmap(row.get("properties"));
let source = TableSource {
id: id.to_string(),
schema: schema,
table: table,
id_column: None,
geometry_column: geometry_column,
srid: srid as u32,
extent: Some(DEFAULT_EXTENT),
buffer: Some(DEFAULT_BUFFER),
clip_geom: Some(DEFAULT_CLIP_GEOM),
geometry_type: row.get("type"),
properties: properties,
};
sources.insert(id, Box::new(source));
}
Ok(sources)
}

View File

@ -1,8 +1,34 @@
use actix_web::dev::Params;
use serde_json;
use std::collections::HashMap;
use super::source::XYZ;
pub fn parse_xyz(params: &Params) -> Result<XYZ, &str> {
let z = params
.get("z")
.and_then(|i| i.parse::<u32>().ok())
.ok_or("invalid z value")?;
let x = params
.get("x")
.and_then(|i| i.parse::<u32>().ok())
.ok_or("invalid x value")?;
let y = params
.get("y")
.and_then(|i| i.parse::<u32>().ok())
.ok_or("invalid y value")?;
Ok(XYZ { x, y, z })
}
// https://github.com/mapbox/postgis-vt-util/blob/master/src/TileBBox.sql
pub fn tilebbox(z: u32, x: u32, y: u32) -> String {
pub fn tilebbox(xyz: XYZ) -> String {
let x = xyz.x;
let y = xyz.y;
let z = xyz.z;
let max = 20037508.34;
let res = (max * 2.0) / (2_i32.pow(z) as f64);