mirror of
https://github.com/maplibre/martin.git
synced 2024-12-18 12:21:56 +03:00
feat: upgrade actix-web to 1.0 (#33)
* test: 💍 add function sources tests * test: 💍 limit postgres pool size in tests * feat: 🎸 upgrade actix-web to 1.0
This commit is contained in:
parent
782bb8f1bd
commit
5a807e40e2
2
.github/workflows/ci.yml
vendored
2
.github/workflows/ci.yml
vendored
@ -194,7 +194,7 @@ jobs:
|
||||
if: startsWith(github.ref, 'refs/tags/')
|
||||
with:
|
||||
draft: true
|
||||
files: 'martin*'
|
||||
files: "martin*"
|
||||
body_path: CHANGELOG.md
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
1108
Cargo.lock
generated
1108
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
23
Cargo.toml
23
Cargo.toml
@ -2,10 +2,20 @@
|
||||
name = "martin"
|
||||
version = "0.4.1"
|
||||
authors = ["Stepan Kuzmin <to.stepan.kuzmin@gmail.com>"]
|
||||
edition = "2018"
|
||||
|
||||
[lib]
|
||||
name = "martin"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[[bin]]
|
||||
name = "martin"
|
||||
path = "src/bin/main.rs"
|
||||
|
||||
[dependencies]
|
||||
actix = "0.7"
|
||||
actix-web = "0.7"
|
||||
actix = "0.8"
|
||||
actix-rt = "0.2"
|
||||
actix-web = "1.0"
|
||||
docopt = "1"
|
||||
env_logger = "0.7"
|
||||
futures = "0.1"
|
||||
@ -19,4 +29,11 @@ serde = "1.0"
|
||||
serde_derive = "1.0"
|
||||
serde_json = "1.0"
|
||||
serde_yaml = "0.8"
|
||||
tilejson = "0.2"
|
||||
tilejson = "0.2"
|
||||
|
||||
[dev-dependencies]
|
||||
criterion = "0.3"
|
||||
|
||||
[[bench]]
|
||||
name = "server"
|
||||
harness = false
|
37
benches/server.rs
Normal file
37
benches/server.rs
Normal file
@ -0,0 +1,37 @@
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
|
||||
use actix_web::dev::Service;
|
||||
use actix_web::{test, App};
|
||||
|
||||
use martin::dev::{mock_function_sources, mock_state, mock_table_sources};
|
||||
use martin::server::router;
|
||||
|
||||
fn criterion_benchmark(c: &mut Criterion) {
|
||||
let state = test::run_on(|| mock_state(mock_table_sources(), mock_function_sources()));
|
||||
let mut app = test::init_service(App::new().data(state).configure(router));
|
||||
|
||||
c.bench_function("/public.table_source/0/0/0.pbf", |b| {
|
||||
b.iter(|| {
|
||||
let req = test::TestRequest::get()
|
||||
.uri("/public.table_source/0/0/0.pbf")
|
||||
.to_request();
|
||||
|
||||
let future = test::run_on(|| app.call(req));
|
||||
let _response = test::block_on(future).unwrap();
|
||||
})
|
||||
});
|
||||
|
||||
c.bench_function("/rpc/public.function_source/0/0/0.pbf", |b| {
|
||||
b.iter(|| {
|
||||
let req = test::TestRequest::get()
|
||||
.uri("/rpc/public.function_source/0/0/0.pbf")
|
||||
.to_request();
|
||||
|
||||
let future = test::run_on(|| app.call(req));
|
||||
let _response = test::block_on(future).unwrap();
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
criterion_group!(benches, criterion_benchmark);
|
||||
criterion_main!(benches);
|
564
src/app.rs
564
src/app.rs
@ -1,564 +0,0 @@
|
||||
use actix::*;
|
||||
use actix_web::*;
|
||||
use futures::future::{result, Future};
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
use std::rc::Rc;
|
||||
|
||||
use super::coordinator_actor::CoordinatorActor;
|
||||
use super::db_executor::DbExecutor;
|
||||
use super::function_source::FunctionSources;
|
||||
use super::messages;
|
||||
use super::table_source::TableSources;
|
||||
use super::utils::{build_tilejson, parse_xyz};
|
||||
use super::worker_actor::WorkerActor;
|
||||
|
||||
pub type Query = HashMap<String, String>;
|
||||
|
||||
pub struct State {
|
||||
db: Addr<DbExecutor>,
|
||||
coordinator: Addr<CoordinatorActor>,
|
||||
table_sources: Rc<RefCell<Option<TableSources>>>,
|
||||
function_sources: Rc<RefCell<Option<FunctionSources>>>,
|
||||
watch_mode: bool,
|
||||
}
|
||||
|
||||
fn get_table_sources(
|
||||
req: &HttpRequest<State>,
|
||||
) -> Result<Box<dyn Future<Item = HttpResponse, Error = Error>>> {
|
||||
let state = &req.state();
|
||||
if state.watch_mode {
|
||||
info!("Scanning database for table sources");
|
||||
let coordinator = state.coordinator.clone();
|
||||
let result = req.state().db.send(messages::GetTableSources {});
|
||||
|
||||
let response = result
|
||||
.from_err()
|
||||
.and_then(move |res| match res {
|
||||
Ok(table_sources) => {
|
||||
coordinator.do_send(messages::RefreshTableSources {
|
||||
table_sources: Some(table_sources.clone()),
|
||||
});
|
||||
|
||||
Ok(HttpResponse::Ok()
|
||||
.header(http::header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
|
||||
.json(table_sources))
|
||||
}
|
||||
Err(_) => Ok(HttpResponse::InternalServerError().into()),
|
||||
})
|
||||
.responder();
|
||||
|
||||
return Ok(response);
|
||||
}
|
||||
|
||||
let table_sources = state
|
||||
.table_sources
|
||||
.borrow()
|
||||
.clone()
|
||||
.ok_or_else(|| error::ErrorNotFound("There is no table sources"))?;
|
||||
|
||||
let http_response = HttpResponse::Ok()
|
||||
.header(http::header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
|
||||
.json(table_sources);
|
||||
|
||||
let response = result(Ok(http_response)).responder();
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
fn get_table_source(req: &HttpRequest<State>) -> Result<HttpResponse> {
|
||||
let state = &req.state();
|
||||
|
||||
let table_sources = state
|
||||
.table_sources
|
||||
.borrow()
|
||||
.clone()
|
||||
.ok_or_else(|| error::ErrorNotFound("There is no table sources"))?;
|
||||
|
||||
let params = req.match_info();
|
||||
let source_id = params
|
||||
.get("source_id")
|
||||
.ok_or_else(|| error::ErrorBadRequest("Invalid table source id"))?;
|
||||
|
||||
let source = table_sources
|
||||
.get(source_id)
|
||||
.ok_or_else(|| error::ErrorNotFound(format!("Table source '{}' not found", source_id)))?;
|
||||
|
||||
let tilejson = build_tilejson(
|
||||
source.clone(),
|
||||
&req.connection_info(),
|
||||
req.path(),
|
||||
req.query_string(),
|
||||
req.headers(),
|
||||
)
|
||||
.map_err(|e| error::ErrorBadRequest(format!("Can't build TileJSON: {}", e)))?;
|
||||
|
||||
Ok(HttpResponse::Ok()
|
||||
.header(http::header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
|
||||
.json(tilejson))
|
||||
}
|
||||
|
||||
fn get_table_source_tile(
|
||||
req: &HttpRequest<State>,
|
||||
) -> Result<Box<dyn Future<Item = HttpResponse, Error = Error>>> {
|
||||
let state = &req.state();
|
||||
|
||||
let table_sources = state
|
||||
.table_sources
|
||||
.borrow()
|
||||
.clone()
|
||||
.ok_or_else(|| error::ErrorNotFound("There is no table sources"))?;
|
||||
|
||||
let params = req.match_info();
|
||||
let source_id = params
|
||||
.get("source_id")
|
||||
.ok_or_else(|| error::ErrorBadRequest("Invalid table source id"))?;
|
||||
|
||||
let source = table_sources
|
||||
.get(source_id)
|
||||
.ok_or_else(|| error::ErrorNotFound(format!("Table source '{}' not found", source_id)))?;
|
||||
|
||||
let xyz = parse_xyz(params)
|
||||
.map_err(|e| error::ErrorBadRequest(format!("Can't parse XYZ scheme: {}", e)))?;
|
||||
|
||||
let query = req.query();
|
||||
|
||||
let message = messages::GetTile {
|
||||
xyz,
|
||||
query: query.clone(),
|
||||
source: source.clone(),
|
||||
};
|
||||
|
||||
let result = req.state().db.send(message);
|
||||
|
||||
let response = result
|
||||
.from_err()
|
||||
.and_then(|res| match res {
|
||||
Ok(tile) => match tile.len() {
|
||||
0 => Ok(HttpResponse::NoContent()
|
||||
.content_type("application/x-protobuf")
|
||||
.header(http::header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
|
||||
.body(tile)),
|
||||
_ => Ok(HttpResponse::Ok()
|
||||
.content_type("application/x-protobuf")
|
||||
.header(http::header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
|
||||
.body(tile)),
|
||||
},
|
||||
Err(_) => Ok(HttpResponse::InternalServerError().into()),
|
||||
})
|
||||
.responder();
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
fn get_function_sources(
|
||||
req: &HttpRequest<State>,
|
||||
) -> Result<Box<dyn Future<Item = HttpResponse, Error = Error>>> {
|
||||
let state = &req.state();
|
||||
if state.watch_mode {
|
||||
info!("Scanning database for function sources");
|
||||
let coordinator = state.coordinator.clone();
|
||||
|
||||
let result = req.state().db.send(messages::GetFunctionSources {});
|
||||
let response = result
|
||||
.from_err()
|
||||
.and_then(move |res| match res {
|
||||
Ok(function_sources) => {
|
||||
coordinator.do_send(messages::RefreshFunctionSources {
|
||||
function_sources: Some(function_sources.clone()),
|
||||
});
|
||||
|
||||
Ok(HttpResponse::Ok()
|
||||
.header(http::header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
|
||||
.json(function_sources))
|
||||
}
|
||||
Err(_) => Ok(HttpResponse::InternalServerError().into()),
|
||||
})
|
||||
.responder();
|
||||
|
||||
return Ok(response);
|
||||
}
|
||||
|
||||
let function_sources = state
|
||||
.function_sources
|
||||
.borrow()
|
||||
.clone()
|
||||
.ok_or_else(|| error::ErrorNotFound("There is no table sources"))?;
|
||||
|
||||
let http_response = HttpResponse::Ok()
|
||||
.header(http::header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
|
||||
.json(function_sources);
|
||||
|
||||
let response = result(Ok(http_response)).responder();
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
fn get_function_source(req: &HttpRequest<State>) -> Result<HttpResponse> {
|
||||
let state = &req.state();
|
||||
let function_sources = state
|
||||
.function_sources
|
||||
.borrow()
|
||||
.clone()
|
||||
.ok_or_else(|| error::ErrorNotFound("There is no function sources"))?;
|
||||
|
||||
let params = req.match_info();
|
||||
let source_id = params
|
||||
.get("source_id")
|
||||
.ok_or_else(|| error::ErrorBadRequest("Invalid function source id"))?;
|
||||
|
||||
let source = function_sources.get(source_id).ok_or_else(|| {
|
||||
error::ErrorNotFound(format!("Function source '{}' not found", source_id))
|
||||
})?;
|
||||
|
||||
let tilejson = build_tilejson(
|
||||
source.clone(),
|
||||
&req.connection_info(),
|
||||
req.path(),
|
||||
req.query_string(),
|
||||
req.headers(),
|
||||
)
|
||||
.map_err(|e| error::ErrorBadRequest(format!("Can't build TileJSON: {}", e)))?;
|
||||
|
||||
Ok(HttpResponse::Ok()
|
||||
.header(http::header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
|
||||
.json(tilejson))
|
||||
}
|
||||
|
||||
fn get_function_source_tile(
|
||||
req: &HttpRequest<State>,
|
||||
) -> Result<Box<dyn Future<Item = HttpResponse, Error = Error>>> {
|
||||
let state = &req.state();
|
||||
let function_sources = state
|
||||
.function_sources
|
||||
.borrow()
|
||||
.clone()
|
||||
.ok_or_else(|| error::ErrorNotFound("There is no function sources"))?;
|
||||
|
||||
let params = req.match_info();
|
||||
let source_id = params
|
||||
.get("source_id")
|
||||
.ok_or_else(|| error::ErrorBadRequest("Invalid function source id"))?;
|
||||
|
||||
let source = function_sources.get(source_id).ok_or_else(|| {
|
||||
error::ErrorNotFound(format!("Function source '{}' not found", source_id))
|
||||
})?;
|
||||
|
||||
let xyz = parse_xyz(params)
|
||||
.map_err(|e| error::ErrorBadRequest(format!("Can't parse XYZ scheme: {}", e)))?;
|
||||
|
||||
let query = req.query();
|
||||
|
||||
let message = messages::GetTile {
|
||||
xyz,
|
||||
query: query.clone(),
|
||||
source: source.clone(),
|
||||
};
|
||||
|
||||
let result = req.state().db.send(message);
|
||||
|
||||
let response = result
|
||||
.from_err()
|
||||
.and_then(|res| match res {
|
||||
Ok(tile) => match tile.len() {
|
||||
0 => Ok(HttpResponse::NoContent()
|
||||
.content_type("application/x-protobuf")
|
||||
.header(http::header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
|
||||
.body(tile)),
|
||||
_ => Ok(HttpResponse::Ok()
|
||||
.content_type("application/x-protobuf")
|
||||
.header(http::header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
|
||||
.body(tile)),
|
||||
},
|
||||
Err(_) => Ok(HttpResponse::InternalServerError().into()),
|
||||
})
|
||||
.responder();
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
db: Addr<DbExecutor>,
|
||||
coordinator: Addr<CoordinatorActor>,
|
||||
table_sources: Option<TableSources>,
|
||||
function_sources: Option<FunctionSources>,
|
||||
watch_mode: bool,
|
||||
) -> App<State> {
|
||||
let table_sources_rc = Rc::new(RefCell::new(table_sources));
|
||||
let function_sources_rc = Rc::new(RefCell::new(function_sources));
|
||||
|
||||
let worker_actor = WorkerActor {
|
||||
table_sources: table_sources_rc.clone(),
|
||||
function_sources: function_sources_rc.clone(),
|
||||
};
|
||||
|
||||
let worker: Addr<_> = worker_actor.start();
|
||||
coordinator.do_send(messages::Connect { addr: worker });
|
||||
|
||||
let state = State {
|
||||
db,
|
||||
coordinator,
|
||||
table_sources: table_sources_rc.clone(),
|
||||
function_sources: function_sources_rc.clone(),
|
||||
watch_mode,
|
||||
};
|
||||
|
||||
App::with_state(state)
|
||||
.middleware(middleware::Logger::default())
|
||||
.resource("/index.json", |r| {
|
||||
r.method(http::Method::GET).f(get_table_sources)
|
||||
})
|
||||
.resource("/{source_id}.json", |r| {
|
||||
r.method(http::Method::GET).f(get_table_source)
|
||||
})
|
||||
.resource("/{source_id}/{z}/{x}/{y}.pbf", |r| {
|
||||
r.method(http::Method::GET).f(get_table_source_tile)
|
||||
})
|
||||
.resource("/rpc/index.json", |r| {
|
||||
r.method(http::Method::GET).f(get_function_sources)
|
||||
})
|
||||
.resource("/rpc/{source_id}.json", |r| {
|
||||
r.method(http::Method::GET).f(get_function_source)
|
||||
})
|
||||
.resource("/rpc/{source_id}/{z}/{x}/{y}.pbf", |r| {
|
||||
r.method(http::Method::GET).f(get_function_source_tile)
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
extern crate env_logger;
|
||||
|
||||
use super::super::db::setup_connection_pool;
|
||||
use super::super::db_executor::DbExecutor;
|
||||
use super::super::function_source::{FunctionSource, FunctionSources};
|
||||
use super::super::table_source::{TableSource, TableSources};
|
||||
use super::*;
|
||||
use actix::SyncArbiter;
|
||||
use actix_web::{http, test};
|
||||
use std::env;
|
||||
|
||||
fn build_test_server(
|
||||
table_sources: Option<TableSources>,
|
||||
function_sources: Option<FunctionSources>,
|
||||
) -> test::TestServer {
|
||||
test::TestServer::build_with_state(move || {
|
||||
let conn_string: String = env::var("DATABASE_URL").unwrap();
|
||||
let pool = setup_connection_pool(&conn_string, None).unwrap();
|
||||
let db = SyncArbiter::start(3, move || DbExecutor(pool.clone()));
|
||||
|
||||
let table_sources_rc = Rc::new(RefCell::new(table_sources.clone()));
|
||||
let function_sources_rc = Rc::new(RefCell::new(function_sources.clone()));
|
||||
|
||||
let coordinator: Addr<_> = CoordinatorActor::default().start();
|
||||
|
||||
let worker_actor = WorkerActor {
|
||||
table_sources: table_sources_rc.clone(),
|
||||
function_sources: function_sources_rc.clone(),
|
||||
};
|
||||
|
||||
let worker: Addr<_> = worker_actor.start();
|
||||
coordinator.do_send(messages::Connect { addr: worker });
|
||||
|
||||
State {
|
||||
db,
|
||||
coordinator,
|
||||
watch_mode: true,
|
||||
table_sources: table_sources_rc.clone(),
|
||||
function_sources: function_sources_rc.clone(),
|
||||
}
|
||||
})
|
||||
.start(|app| {
|
||||
app.resource("/index.json", |r| {
|
||||
r.method(http::Method::GET).f(get_table_sources)
|
||||
})
|
||||
.resource("/{source_id}.json", |r| {
|
||||
r.method(http::Method::GET).f(get_table_source)
|
||||
})
|
||||
.resource("/{source_id}/{z}/{x}/{y}.pbf", |r| {
|
||||
r.method(http::Method::GET).f(get_table_source_tile)
|
||||
})
|
||||
.resource("/rpc/index.json", |r| {
|
||||
r.method(http::Method::GET).f(get_function_sources)
|
||||
})
|
||||
.resource("/rpc/{source_id}.json", |r| {
|
||||
r.method(http::Method::GET).f(get_function_source)
|
||||
})
|
||||
.resource("/rpc/{source_id}/{z}/{x}/{y}.pbf", |r| {
|
||||
r.method(http::Method::GET).f(get_function_source_tile)
|
||||
});
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn table_sources_test() {
|
||||
let id = "public.table_source";
|
||||
let source = TableSource {
|
||||
id: id.to_owned(),
|
||||
schema: "public".to_owned(),
|
||||
table: "table_source".to_owned(),
|
||||
id_column: None,
|
||||
geometry_column: "geom".to_owned(),
|
||||
srid: 3857,
|
||||
extent: Some(4096),
|
||||
buffer: Some(64),
|
||||
clip_geom: Some(true),
|
||||
geometry_type: None,
|
||||
properties: HashMap::new(),
|
||||
};
|
||||
|
||||
let mut table_sources: TableSources = HashMap::new();
|
||||
table_sources.insert(id.to_owned(), Box::new(source));
|
||||
|
||||
let mut srv = build_test_server(Some(table_sources), None);
|
||||
|
||||
let request = srv
|
||||
.client(http::Method::GET, "/index.json")
|
||||
.finish()
|
||||
.unwrap();
|
||||
|
||||
let response = srv.execute(request.send()).unwrap();
|
||||
assert!(response.status().is_success());
|
||||
|
||||
let headers = response.headers();
|
||||
assert!(headers.contains_key(http::header::ACCESS_CONTROL_ALLOW_ORIGIN));
|
||||
|
||||
let body = response.body().wait().unwrap();
|
||||
let body_str = std::str::from_utf8(&body).unwrap();
|
||||
let table_sources: TableSources = serde_json::from_str(body_str).unwrap();
|
||||
assert!(table_sources.contains_key("public.table_source"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn function_sources_test() {
|
||||
let id = "public.function_source";
|
||||
let source = FunctionSource {
|
||||
id: id.to_owned(),
|
||||
schema: "public".to_owned(),
|
||||
function: "function_source".to_owned(),
|
||||
};
|
||||
|
||||
let mut function_sources: FunctionSources = HashMap::new();
|
||||
function_sources.insert(id.to_owned(), Box::new(source));
|
||||
|
||||
let mut srv = build_test_server(None, Some(function_sources));
|
||||
|
||||
let request = srv
|
||||
.client(http::Method::GET, "/rpc/index.json")
|
||||
.finish()
|
||||
.unwrap();
|
||||
|
||||
let response = srv.execute(request.send()).unwrap();
|
||||
assert!(response.status().is_success());
|
||||
|
||||
let headers = response.headers();
|
||||
assert!(headers.contains_key(http::header::ACCESS_CONTROL_ALLOW_ORIGIN));
|
||||
|
||||
let body = response.body().wait().unwrap();
|
||||
let body_str = std::str::from_utf8(&body).unwrap();
|
||||
let function_sources: FunctionSources = serde_json::from_str(body_str).unwrap();
|
||||
|
||||
assert!(function_sources.contains_key("public.function_source"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sources_not_found_test() {
|
||||
let mut srv = build_test_server(None, None);
|
||||
|
||||
let request = srv
|
||||
.client(http::Method::GET, "/public.non_existant.json")
|
||||
.finish()
|
||||
.unwrap();
|
||||
|
||||
let response = srv.execute(request.send()).unwrap();
|
||||
assert_eq!(response.status().as_u16(), 404);
|
||||
|
||||
let request = srv
|
||||
.client(http::Method::GET, "/rpc/public.non_existant.json")
|
||||
.finish()
|
||||
.unwrap();
|
||||
|
||||
let response = srv.execute(request.send()).unwrap();
|
||||
assert_eq!(response.status().as_u16(), 404);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn table_source_test() {
|
||||
let id = "public.table_source";
|
||||
let source = TableSource {
|
||||
id: id.to_owned(),
|
||||
schema: "public".to_owned(),
|
||||
table: "table_source".to_owned(),
|
||||
id_column: None,
|
||||
geometry_column: "geom".to_owned(),
|
||||
srid: 3857,
|
||||
extent: Some(4096),
|
||||
buffer: Some(64),
|
||||
clip_geom: Some(true),
|
||||
geometry_type: None,
|
||||
properties: HashMap::new(),
|
||||
};
|
||||
|
||||
let mut table_sources: TableSources = HashMap::new();
|
||||
table_sources.insert(id.to_owned(), Box::new(source));
|
||||
|
||||
let mut srv = build_test_server(Some(table_sources), None);
|
||||
|
||||
let request = srv
|
||||
.client(http::Method::GET, "/public.table_source.json")
|
||||
.finish()
|
||||
.unwrap();
|
||||
|
||||
let response = srv.execute(request.send()).unwrap();
|
||||
assert!(response.status().is_success());
|
||||
|
||||
let headers = response.headers();
|
||||
assert!(headers.contains_key(http::header::ACCESS_CONTROL_ALLOW_ORIGIN));
|
||||
|
||||
let request = srv
|
||||
.client(http::Method::GET, "/public.table_source/0/0/0.pbf")
|
||||
.finish()
|
||||
.unwrap();
|
||||
|
||||
let response = srv.execute(request.send()).unwrap();
|
||||
assert!(response.status().is_success());
|
||||
|
||||
let headers = response.headers();
|
||||
assert!(headers.contains_key(http::header::ACCESS_CONTROL_ALLOW_ORIGIN));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn function_source_test() {
|
||||
let id = "public.function_source";
|
||||
let source = FunctionSource {
|
||||
id: id.to_owned(),
|
||||
schema: "public".to_owned(),
|
||||
function: "function_source".to_owned(),
|
||||
};
|
||||
|
||||
let mut function_sources: FunctionSources = HashMap::new();
|
||||
function_sources.insert(id.to_owned(), Box::new(source));
|
||||
|
||||
let mut srv = build_test_server(None, Some(function_sources));
|
||||
|
||||
let request = srv
|
||||
.client(http::Method::GET, "/rpc/public.function_source.json")
|
||||
.finish()
|
||||
.unwrap();
|
||||
|
||||
let response = srv.execute(request.send()).unwrap();
|
||||
assert!(response.status().is_success());
|
||||
|
||||
let headers = response.headers();
|
||||
assert!(headers.contains_key(http::header::ACCESS_CONTROL_ALLOW_ORIGIN));
|
||||
|
||||
let request = srv
|
||||
.client(http::Method::GET, "/rpc/public.function_source/0/0/0.pbf")
|
||||
.finish()
|
||||
.unwrap();
|
||||
|
||||
let response = srv.execute(request.send()).unwrap();
|
||||
assert!(response.status().is_success());
|
||||
|
||||
let headers = response.headers();
|
||||
assert!(headers.contains_key(http::header::ACCESS_CONTROL_ALLOW_ORIGIN));
|
||||
}
|
||||
}
|
@ -1,47 +1,80 @@
|
||||
extern crate actix;
|
||||
extern crate actix_web;
|
||||
extern crate docopt;
|
||||
extern crate env_logger;
|
||||
extern crate futures;
|
||||
extern crate tilejson;
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
extern crate num_cpus;
|
||||
extern crate postgres;
|
||||
extern crate r2d2;
|
||||
extern crate r2d2_postgres;
|
||||
extern crate semver;
|
||||
extern crate serde;
|
||||
#[macro_use]
|
||||
extern crate serde_derive;
|
||||
extern crate serde_json;
|
||||
extern crate serde_yaml;
|
||||
|
||||
mod app;
|
||||
mod cli;
|
||||
mod config;
|
||||
mod coordinator_actor;
|
||||
mod db;
|
||||
mod db_executor;
|
||||
mod function_source;
|
||||
mod messages;
|
||||
mod server;
|
||||
mod source;
|
||||
mod table_source;
|
||||
mod utils;
|
||||
mod worker_actor;
|
||||
|
||||
use docopt::Docopt;
|
||||
use std::env;
|
||||
use serde::Deserialize;
|
||||
use std::error::Error;
|
||||
use std::{env, io};
|
||||
|
||||
use cli::{Args, USAGE};
|
||||
use config::{generate_config, read_config, Config};
|
||||
use db::{check_postgis_version, setup_connection_pool, PostgresPool};
|
||||
use utils::prettify_error;
|
||||
use martin::config::{read_config, Config, ConfigBuilder};
|
||||
use martin::db::{check_postgis_version, setup_connection_pool, PostgresPool};
|
||||
use martin::function_source::get_function_sources;
|
||||
use martin::server;
|
||||
use martin::table_source::get_table_sources;
|
||||
use martin::utils::prettify_error;
|
||||
|
||||
const VERSION: &str = env!("CARGO_PKG_VERSION");
|
||||
const REQUIRED_POSTGIS_VERSION: &str = ">= 2.4.0";
|
||||
|
||||
pub const USAGE: &str = "
|
||||
Martin - PostGIS Mapbox Vector Tiles server.
|
||||
|
||||
Usage:
|
||||
martin [options] [<connection>]
|
||||
martin -h | --help
|
||||
martin -v | --version
|
||||
|
||||
Options:
|
||||
-h --help Show this screen.
|
||||
-v --version Show version.
|
||||
--config=<path> Path to config file.
|
||||
--keep-alive=<n> Connection keep alive timeout [default: 75].
|
||||
--listen-addresses=<n> The socket address to bind [default: 0.0.0.0:3000].
|
||||
--pool-size=<n> Maximum connections pool size [default: 20].
|
||||
--watch Scan for new sources on sources list requests
|
||||
--workers=<n> Number of web server workers.
|
||||
";
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct Args {
|
||||
pub arg_connection: Option<String>,
|
||||
pub flag_config: Option<String>,
|
||||
pub flag_help: bool,
|
||||
pub flag_keep_alive: Option<usize>,
|
||||
pub flag_listen_addresses: Option<String>,
|
||||
pub flag_pool_size: Option<u32>,
|
||||
pub flag_watch: bool,
|
||||
pub flag_version: bool,
|
||||
pub flag_workers: Option<usize>,
|
||||
}
|
||||
|
||||
pub fn generate_config(
|
||||
args: Args,
|
||||
connection_string: String,
|
||||
pool: &PostgresPool,
|
||||
) -> io::Result<Config> {
|
||||
let conn = pool
|
||||
.get()
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.description()))?;
|
||||
|
||||
let table_sources = get_table_sources(&conn)?;
|
||||
let function_sources = get_function_sources(&conn)?;
|
||||
|
||||
let config = ConfigBuilder {
|
||||
connection_string,
|
||||
watch: Some(args.flag_watch),
|
||||
keep_alive: args.flag_keep_alive,
|
||||
listen_addresses: args.flag_listen_addresses,
|
||||
pool_size: args.flag_pool_size,
|
||||
worker_processes: args.flag_workers,
|
||||
table_sources: Some(table_sources),
|
||||
function_sources: Some(function_sources),
|
||||
};
|
||||
|
||||
let config = config.finalize();
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
fn setup_from_config(file_name: String) -> Result<(Config, PostgresPool), std::io::Error> {
|
||||
let config = read_config(&file_name).map_err(prettify_error("Can't read config"))?;
|
||||
|
31
src/cli.rs
31
src/cli.rs
@ -1,31 +0,0 @@
|
||||
pub const USAGE: &str = "
|
||||
Martin - PostGIS Mapbox Vector Tiles server.
|
||||
|
||||
Usage:
|
||||
martin [options] [<connection>]
|
||||
martin -h | --help
|
||||
martin -v | --version
|
||||
|
||||
Options:
|
||||
-h --help Show this screen.
|
||||
-v --version Show version.
|
||||
--config=<path> Path to config file.
|
||||
--keep-alive=<n> Connection keep alive timeout [default: 75].
|
||||
--listen-addresses=<n> The socket address to bind [default: 0.0.0.0:3000].
|
||||
--pool-size=<n> Maximum connections pool size [default: 20].
|
||||
--watch Scan for new sources on sources list requests
|
||||
--workers=<n> Number of web server workers.
|
||||
";
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct Args {
|
||||
pub arg_connection: Option<String>,
|
||||
pub flag_config: Option<String>,
|
||||
pub flag_help: bool,
|
||||
pub flag_keep_alive: Option<usize>,
|
||||
pub flag_listen_addresses: Option<String>,
|
||||
pub flag_pool_size: Option<u32>,
|
||||
pub flag_watch: bool,
|
||||
pub flag_version: bool,
|
||||
pub flag_workers: Option<usize>,
|
||||
}
|
@ -1,14 +1,12 @@
|
||||
use num_cpus;
|
||||
use serde_yaml;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::error::Error;
|
||||
use std::fs::File;
|
||||
use std::io;
|
||||
use std::io::prelude::*;
|
||||
|
||||
use super::cli::Args;
|
||||
use super::db::PostgresPool;
|
||||
use super::function_source::{get_function_sources, FunctionSources};
|
||||
use super::table_source::{get_table_sources, TableSources};
|
||||
use crate::function_source::FunctionSources;
|
||||
use crate::table_source::TableSources;
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
pub struct Config {
|
||||
@ -23,7 +21,7 @@ pub struct Config {
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct ConfigBuilder {
|
||||
pub struct ConfigBuilder {
|
||||
pub watch: Option<bool>,
|
||||
pub pool_size: Option<u32>,
|
||||
pub keep_alive: Option<usize>,
|
||||
@ -61,30 +59,3 @@ pub fn read_config(file_name: &str) -> io::Result<Config> {
|
||||
|
||||
Ok(config_builder.finalize())
|
||||
}
|
||||
|
||||
pub fn generate_config(
|
||||
args: Args,
|
||||
connection_string: String,
|
||||
pool: &PostgresPool,
|
||||
) -> io::Result<Config> {
|
||||
let conn = pool
|
||||
.get()
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.description()))?;
|
||||
|
||||
let table_sources = get_table_sources(&conn)?;
|
||||
let function_sources = get_function_sources(&conn)?;
|
||||
|
||||
let config = ConfigBuilder {
|
||||
connection_string,
|
||||
watch: Some(args.flag_watch),
|
||||
keep_alive: args.flag_keep_alive,
|
||||
listen_addresses: args.flag_listen_addresses,
|
||||
pool_size: args.flag_pool_size,
|
||||
worker_processes: args.flag_workers,
|
||||
table_sources: Some(table_sources),
|
||||
function_sources: Some(function_sources),
|
||||
};
|
||||
|
||||
let config = config.finalize();
|
||||
Ok(config)
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
use actix::prelude::*;
|
||||
use actix::{Actor, Addr, Context, Handler};
|
||||
|
||||
use super::messages;
|
||||
use super::worker_actor::WorkerActor;
|
||||
use crate::messages;
|
||||
use crate::worker_actor::WorkerActor;
|
||||
|
||||
pub struct CoordinatorActor {
|
||||
workers: Vec<Addr<WorkerActor>>,
|
||||
|
@ -1,19 +1,19 @@
|
||||
use actix::prelude::*;
|
||||
use actix::{Actor, Handler, SyncContext};
|
||||
use std::io;
|
||||
|
||||
use super::db::PostgresPool;
|
||||
use super::function_source::{get_function_sources, FunctionSources};
|
||||
use super::messages;
|
||||
use super::source::Tile;
|
||||
use super::table_source::{get_table_sources, TableSources};
|
||||
use crate::db::PostgresPool;
|
||||
use crate::function_source::{get_function_sources, FunctionSources};
|
||||
use crate::messages;
|
||||
use crate::source::Tile;
|
||||
use crate::table_source::{get_table_sources, TableSources};
|
||||
|
||||
pub struct DbExecutor(pub PostgresPool);
|
||||
pub struct DBActor(pub PostgresPool);
|
||||
|
||||
impl Actor for DbExecutor {
|
||||
impl Actor for DBActor {
|
||||
type Context = SyncContext<Self>;
|
||||
}
|
||||
|
||||
impl Handler<messages::GetTableSources> for DbExecutor {
|
||||
impl Handler<messages::GetTableSources> for DBActor {
|
||||
type Result = Result<TableSources, io::Error>;
|
||||
|
||||
fn handle(&mut self, _msg: messages::GetTableSources, _: &mut Self::Context) -> Self::Result {
|
||||
@ -23,7 +23,7 @@ impl Handler<messages::GetTableSources> for DbExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<messages::GetFunctionSources> for DbExecutor {
|
||||
impl Handler<messages::GetFunctionSources> for DBActor {
|
||||
type Result = Result<FunctionSources, io::Error>;
|
||||
|
||||
fn handle(&mut self, _msg: messages::GetFunctionSources, _: &mut Self::Context) -> Self::Result {
|
||||
@ -33,7 +33,7 @@ impl Handler<messages::GetFunctionSources> for DbExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<messages::GetTile> for DbExecutor {
|
||||
impl Handler<messages::GetTile> for DBActor {
|
||||
type Result = Result<Tile, io::Error>;
|
||||
|
||||
fn handle(&mut self, msg: messages::GetTile, _: &mut Self::Context) -> Self::Result {
|
72
src/dev.rs
Normal file
72
src/dev.rs
Normal file
@ -0,0 +1,72 @@
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
use std::env;
|
||||
use std::rc::Rc;
|
||||
|
||||
use actix::{Actor, Addr, SyncArbiter};
|
||||
|
||||
use crate::coordinator_actor::CoordinatorActor;
|
||||
use crate::db::setup_connection_pool;
|
||||
use crate::db_actor::DBActor;
|
||||
use crate::function_source::{FunctionSource, FunctionSources};
|
||||
use crate::server::AppState;
|
||||
use crate::table_source::{TableSource, TableSources};
|
||||
|
||||
pub fn mock_table_sources() -> Option<TableSources> {
|
||||
let id = "public.table_source";
|
||||
let source = TableSource {
|
||||
id: id.to_owned(),
|
||||
schema: "public".to_owned(),
|
||||
table: "table_source".to_owned(),
|
||||
id_column: None,
|
||||
geometry_column: "geom".to_owned(),
|
||||
srid: 3857,
|
||||
extent: Some(4096),
|
||||
buffer: Some(64),
|
||||
clip_geom: Some(true),
|
||||
geometry_type: None,
|
||||
properties: HashMap::new(),
|
||||
};
|
||||
|
||||
let mut table_sources: TableSources = HashMap::new();
|
||||
table_sources.insert(id.to_owned(), Box::new(source));
|
||||
Some(table_sources)
|
||||
}
|
||||
|
||||
pub fn mock_function_sources() -> Option<FunctionSources> {
|
||||
let id = "public.function_source";
|
||||
let source = FunctionSource {
|
||||
id: id.to_owned(),
|
||||
schema: "public".to_owned(),
|
||||
function: "function_source".to_owned(),
|
||||
};
|
||||
|
||||
let mut function_sources: FunctionSources = HashMap::new();
|
||||
function_sources.insert(id.to_owned(), Box::new(source));
|
||||
Some(function_sources)
|
||||
}
|
||||
|
||||
pub fn mock_state(
|
||||
table_sources: Option<TableSources>,
|
||||
function_sources: Option<FunctionSources>,
|
||||
) -> AppState {
|
||||
let connection_string: String = env::var("DATABASE_URL").unwrap();
|
||||
info!("Connecting to {}", connection_string);
|
||||
|
||||
let pool = setup_connection_pool(&connection_string, Some(1)).unwrap();
|
||||
info!("Connected to {}", connection_string);
|
||||
|
||||
let db = SyncArbiter::start(3, move || DBActor(pool.clone()));
|
||||
let coordinator: Addr<_> = CoordinatorActor::default().start();
|
||||
|
||||
let table_sources = Rc::new(RefCell::new(table_sources));
|
||||
let function_sources = Rc::new(RefCell::new(function_sources));
|
||||
|
||||
AppState {
|
||||
db: db.clone(),
|
||||
coordinator: coordinator.clone(),
|
||||
table_sources,
|
||||
function_sources,
|
||||
watch_mode: false,
|
||||
}
|
||||
}
|
@ -1,11 +1,12 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::error::Error;
|
||||
use std::io;
|
||||
use tilejson::{TileJSON, TileJSONBuilder};
|
||||
|
||||
use super::app::Query;
|
||||
use super::db::PostgresConnection;
|
||||
use super::source::{Source, Tile, XYZ};
|
||||
use super::utils::query_to_json_string;
|
||||
use crate::db::PostgresConnection;
|
||||
use crate::source::{Query, Source, Tile, XYZ};
|
||||
use crate::utils::query_to_json_string;
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct FunctionSource {
|
||||
@ -21,14 +22,27 @@ impl Source for FunctionSource {
|
||||
self.id.as_str()
|
||||
}
|
||||
|
||||
fn get_tilejson(&self) -> Result<TileJSON, io::Error> {
|
||||
let mut tilejson_builder = TileJSONBuilder::new();
|
||||
|
||||
tilejson_builder.scheme("tms");
|
||||
tilejson_builder.name(&self.id);
|
||||
tilejson_builder.tiles(vec![]);
|
||||
|
||||
Ok(tilejson_builder.finalize())
|
||||
}
|
||||
|
||||
fn get_tile(
|
||||
&self,
|
||||
conn: &PostgresConnection,
|
||||
xyz: &XYZ,
|
||||
query: &Query,
|
||||
query: &Option<Query>,
|
||||
) -> Result<Tile, io::Error> {
|
||||
let empty_query = HashMap::new();
|
||||
let query = query.as_ref().unwrap_or(&empty_query);
|
||||
|
||||
let query_json_string =
|
||||
query_to_json_string(query).map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
|
||||
query_to_json_string(&query).map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
|
||||
|
||||
let query = format!(
|
||||
include_str!("scripts/call_rpc.sql"),
|
||||
|
15
src/lib.rs
Normal file
15
src/lib.rs
Normal file
@ -0,0 +1,15 @@
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
|
||||
pub mod config;
|
||||
pub mod coordinator_actor;
|
||||
pub mod db;
|
||||
pub mod db_actor;
|
||||
pub mod dev;
|
||||
pub mod function_source;
|
||||
pub mod messages;
|
||||
pub mod server;
|
||||
pub mod source;
|
||||
pub mod table_source;
|
||||
pub mod utils;
|
||||
pub mod worker_actor;
|
@ -1,11 +1,10 @@
|
||||
use actix::prelude::*;
|
||||
use actix::{Addr, Message};
|
||||
use std::io;
|
||||
|
||||
use super::app::Query;
|
||||
use super::function_source::FunctionSources;
|
||||
use super::source::{Source, Tile, XYZ};
|
||||
use super::table_source::TableSources;
|
||||
use super::worker_actor::WorkerActor;
|
||||
use crate::function_source::FunctionSources;
|
||||
use crate::source::{Query, Source, Tile, XYZ};
|
||||
use crate::table_source::TableSources;
|
||||
use crate::worker_actor::WorkerActor;
|
||||
|
||||
pub struct Connect {
|
||||
pub addr: Addr<WorkerActor>,
|
||||
@ -17,7 +16,7 @@ impl Message for Connect {
|
||||
|
||||
pub struct GetTile {
|
||||
pub xyz: XYZ,
|
||||
pub query: Query,
|
||||
pub query: Option<Query>,
|
||||
pub source: Box<dyn Source + Send>,
|
||||
}
|
||||
|
||||
|
364
src/server.rs
364
src/server.rs
@ -1,30 +1,358 @@
|
||||
use actix::{Actor, Addr, SyncArbiter, System, SystemRunner};
|
||||
use actix_web::server;
|
||||
use serde::Deserialize;
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
use std::rc::Rc;
|
||||
|
||||
use super::app;
|
||||
use super::config::Config;
|
||||
use super::coordinator_actor::CoordinatorActor;
|
||||
use super::db::PostgresPool;
|
||||
use super::db_executor::DbExecutor;
|
||||
use actix::{Actor, Addr, SyncArbiter, SystemRunner};
|
||||
use actix_web::{
|
||||
error, http, middleware, web, App, Either, Error, HttpRequest, HttpResponse, HttpServer, Result,
|
||||
};
|
||||
use futures::Future;
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::coordinator_actor::CoordinatorActor;
|
||||
use crate::db::PostgresPool;
|
||||
use crate::db_actor::DBActor;
|
||||
use crate::function_source::FunctionSources;
|
||||
use crate::messages;
|
||||
use crate::source::{Source, XYZ};
|
||||
use crate::table_source::TableSources;
|
||||
use crate::worker_actor::WorkerActor;
|
||||
|
||||
pub struct AppState {
|
||||
pub db: Addr<DBActor>,
|
||||
pub coordinator: Addr<CoordinatorActor>,
|
||||
pub table_sources: Rc<RefCell<Option<TableSources>>>,
|
||||
pub function_sources: Rc<RefCell<Option<FunctionSources>>>,
|
||||
pub watch_mode: bool,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct SourceRequest {
|
||||
source_id: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct TileRequest {
|
||||
source_id: String,
|
||||
z: u32,
|
||||
x: u32,
|
||||
y: u32,
|
||||
#[allow(dead_code)]
|
||||
format: String,
|
||||
}
|
||||
|
||||
type SourcesResult = Either<HttpResponse, Box<dyn Future<Item = HttpResponse, Error = Error>>>;
|
||||
|
||||
fn get_table_sources(state: web::Data<AppState>) -> SourcesResult {
|
||||
if !state.watch_mode {
|
||||
let table_sources = state.table_sources.borrow().clone();
|
||||
let response = HttpResponse::Ok().json(table_sources);
|
||||
return Either::A(response);
|
||||
}
|
||||
|
||||
info!("Scanning database for table sources");
|
||||
let response = state
|
||||
.db
|
||||
.send(messages::GetTableSources {})
|
||||
.from_err()
|
||||
.and_then(move |table_sources| match table_sources {
|
||||
Ok(table_sources) => {
|
||||
state.coordinator.do_send(messages::RefreshTableSources {
|
||||
table_sources: Some(table_sources.clone()),
|
||||
});
|
||||
|
||||
Ok(HttpResponse::Ok().json(table_sources))
|
||||
}
|
||||
Err(_) => Ok(HttpResponse::InternalServerError().into()),
|
||||
});
|
||||
|
||||
Either::B(Box::new(response))
|
||||
}
|
||||
|
||||
fn get_table_source(
|
||||
req: HttpRequest,
|
||||
path: web::Path<SourceRequest>,
|
||||
state: web::Data<AppState>,
|
||||
) -> Result<HttpResponse> {
|
||||
let table_sources = state
|
||||
.table_sources
|
||||
.borrow()
|
||||
.clone()
|
||||
.ok_or_else(|| error::ErrorNotFound("There is no table sources"))?;
|
||||
|
||||
let source = table_sources.get(&path.source_id).ok_or_else(|| {
|
||||
error::ErrorNotFound(format!("Table source '{}' not found", path.source_id))
|
||||
})?;
|
||||
|
||||
let mut tilejson = source
|
||||
.get_tilejson()
|
||||
.map_err(|e| error::ErrorBadRequest(format!("Can't build TileJSON: {}", e)))?;
|
||||
|
||||
let tiles_path = req
|
||||
.headers()
|
||||
.get("x-rewrite-url")
|
||||
.map_or(Ok(req.path().trim_end_matches(".json")), |header| {
|
||||
let header_str = header.to_str()?;
|
||||
Ok(header_str.trim_end_matches(".json"))
|
||||
})
|
||||
.map_err(|e: http::header::ToStrError| {
|
||||
error::ErrorBadRequest(format!("Can't build TileJSON: {}", e))
|
||||
})?;
|
||||
|
||||
let query_string = req.query_string();
|
||||
let query = if query_string.is_empty() {
|
||||
query_string.to_owned()
|
||||
} else {
|
||||
format!("?{}", query_string)
|
||||
};
|
||||
|
||||
let connection_info = req.connection_info();
|
||||
|
||||
let tiles_url = format!(
|
||||
"{}://{}{}/{{z}}/{{x}}/{{y}}.pbf{}",
|
||||
connection_info.scheme(),
|
||||
connection_info.host(),
|
||||
tiles_path,
|
||||
query
|
||||
);
|
||||
|
||||
tilejson.tiles = vec![tiles_url];
|
||||
Ok(HttpResponse::Ok().json(tilejson))
|
||||
}
|
||||
|
||||
fn get_table_source_tile(
|
||||
path: web::Path<TileRequest>,
|
||||
state: web::Data<AppState>,
|
||||
) -> Result<Box<dyn Future<Item = HttpResponse, Error = Error>>> {
|
||||
let table_sources = state
|
||||
.table_sources
|
||||
.borrow()
|
||||
.clone()
|
||||
.ok_or_else(|| error::ErrorNotFound("There is no table sources"))?;
|
||||
|
||||
let source = table_sources.get(&path.source_id).ok_or_else(|| {
|
||||
error::ErrorNotFound(format!("Table source '{}' not found", path.source_id))
|
||||
})?;
|
||||
|
||||
let xyz = XYZ {
|
||||
z: path.z,
|
||||
x: path.x,
|
||||
y: path.y,
|
||||
};
|
||||
|
||||
let message = messages::GetTile {
|
||||
xyz,
|
||||
query: None,
|
||||
source: source.clone(),
|
||||
};
|
||||
|
||||
let response = state
|
||||
.db
|
||||
.send(message)
|
||||
.from_err()
|
||||
.and_then(|result| match result {
|
||||
Ok(tile) => match tile.len() {
|
||||
0 => Ok(HttpResponse::NoContent()
|
||||
.content_type("application/x-protobuf")
|
||||
.body(tile)),
|
||||
_ => Ok(HttpResponse::Ok()
|
||||
.content_type("application/x-protobuf")
|
||||
.body(tile)),
|
||||
},
|
||||
Err(_) => Ok(HttpResponse::InternalServerError().into()),
|
||||
});
|
||||
|
||||
Ok(Box::new(response))
|
||||
}
|
||||
|
||||
fn get_function_sources(state: web::Data<AppState>) -> SourcesResult {
|
||||
if !state.watch_mode {
|
||||
let function_sources = state.function_sources.borrow().clone();
|
||||
let response = HttpResponse::Ok().json(function_sources);
|
||||
return Either::A(response);
|
||||
}
|
||||
|
||||
info!("Scanning database for function sources");
|
||||
let response = state
|
||||
.db
|
||||
.send(messages::GetFunctionSources {})
|
||||
.from_err()
|
||||
.and_then(move |function_sources| match function_sources {
|
||||
Ok(function_sources) => {
|
||||
state.coordinator.do_send(messages::RefreshFunctionSources {
|
||||
function_sources: Some(function_sources.clone()),
|
||||
});
|
||||
|
||||
Ok(HttpResponse::Ok().json(function_sources))
|
||||
}
|
||||
Err(_) => Ok(HttpResponse::InternalServerError().into()),
|
||||
});
|
||||
|
||||
Either::B(Box::new(response))
|
||||
}
|
||||
|
||||
fn get_function_source(
|
||||
req: HttpRequest,
|
||||
path: web::Path<SourceRequest>,
|
||||
state: web::Data<AppState>,
|
||||
) -> Result<HttpResponse> {
|
||||
let function_sources = state
|
||||
.function_sources
|
||||
.borrow()
|
||||
.clone()
|
||||
.ok_or_else(|| error::ErrorNotFound("There is no function sources"))?;
|
||||
|
||||
let source = function_sources.get(&path.source_id).ok_or_else(|| {
|
||||
error::ErrorNotFound(format!("Function source '{}' not found", path.source_id))
|
||||
})?;
|
||||
|
||||
let mut tilejson = source
|
||||
.get_tilejson()
|
||||
.map_err(|e| error::ErrorBadRequest(format!("Can't build TileJSON: {}", e)))?;
|
||||
|
||||
let tiles_path = req
|
||||
.headers()
|
||||
.get("x-rewrite-url")
|
||||
.map_or(Ok(req.path().trim_end_matches(".json")), |header| {
|
||||
let header_str = header.to_str()?;
|
||||
Ok(header_str.trim_end_matches(".json"))
|
||||
})
|
||||
.map_err(|e: http::header::ToStrError| {
|
||||
error::ErrorBadRequest(format!("Can't build TileJSON: {}", e))
|
||||
})?;
|
||||
|
||||
let query_string = req.query_string();
|
||||
let query = if query_string.is_empty() {
|
||||
query_string.to_owned()
|
||||
} else {
|
||||
format!("?{}", query_string)
|
||||
};
|
||||
|
||||
let connection_info = req.connection_info();
|
||||
|
||||
let tiles_url = format!(
|
||||
"{}://{}{}/{{z}}/{{x}}/{{y}}.pbf{}",
|
||||
connection_info.scheme(),
|
||||
connection_info.host(),
|
||||
tiles_path,
|
||||
query
|
||||
);
|
||||
|
||||
tilejson.tiles = vec![tiles_url];
|
||||
Ok(HttpResponse::Ok().json(tilejson))
|
||||
}
|
||||
|
||||
fn get_function_source_tile(
|
||||
path: web::Path<TileRequest>,
|
||||
query: web::Query<HashMap<String, String>>,
|
||||
state: web::Data<AppState>,
|
||||
) -> Result<Box<dyn Future<Item = HttpResponse, Error = Error>>> {
|
||||
let function_sources = state
|
||||
.function_sources
|
||||
.borrow()
|
||||
.clone()
|
||||
.ok_or_else(|| error::ErrorNotFound("There is no function sources"))?;
|
||||
|
||||
let source = function_sources.get(&path.source_id).ok_or_else(|| {
|
||||
error::ErrorNotFound(format!("Function source '{}' not found", path.source_id))
|
||||
})?;
|
||||
|
||||
let xyz = XYZ {
|
||||
z: path.z,
|
||||
x: path.x,
|
||||
y: path.y,
|
||||
};
|
||||
|
||||
let message = messages::GetTile {
|
||||
xyz,
|
||||
query: Some(query.into_inner()),
|
||||
source: source.clone(),
|
||||
};
|
||||
|
||||
let response = state
|
||||
.db
|
||||
.send(message)
|
||||
.from_err()
|
||||
.and_then(|result| match result {
|
||||
Ok(tile) => match tile.len() {
|
||||
0 => Ok(HttpResponse::NoContent()
|
||||
.content_type("application/x-protobuf")
|
||||
.body(tile)),
|
||||
_ => Ok(HttpResponse::Ok()
|
||||
.content_type("application/x-protobuf")
|
||||
.body(tile)),
|
||||
},
|
||||
Err(_) => Ok(HttpResponse::InternalServerError().into()),
|
||||
});
|
||||
|
||||
Ok(Box::new(response))
|
||||
}
|
||||
|
||||
pub fn router(cfg: &mut web::ServiceConfig) {
|
||||
cfg.route("/index.json", web::get().to(get_table_sources))
|
||||
.route("/{source_id}.json", web::get().to(get_table_source))
|
||||
.route(
|
||||
"/{source_id}/{z}/{x}/{y}.{format}",
|
||||
web::get().to(get_table_source_tile),
|
||||
)
|
||||
.route("/rpc/index.json", web::get().to(get_function_sources))
|
||||
.route("/rpc/{source_id}.json", web::get().to(get_function_source))
|
||||
.route(
|
||||
"/rpc/{source_id}/{z}/{x}/{y}.{format}",
|
||||
web::get().to(get_function_source_tile),
|
||||
);
|
||||
}
|
||||
|
||||
fn create_state(
|
||||
db: Addr<DBActor>,
|
||||
coordinator: Addr<CoordinatorActor>,
|
||||
config: Config,
|
||||
watch_mode: bool,
|
||||
) -> AppState {
|
||||
let table_sources = Rc::new(RefCell::new(config.table_sources));
|
||||
let function_sources = Rc::new(RefCell::new(config.function_sources));
|
||||
|
||||
let worker_actor = WorkerActor {
|
||||
table_sources: table_sources.clone(),
|
||||
function_sources: function_sources.clone(),
|
||||
};
|
||||
|
||||
let worker: Addr<_> = worker_actor.start();
|
||||
coordinator.do_send(messages::Connect { addr: worker });
|
||||
|
||||
AppState {
|
||||
db: db.clone(),
|
||||
coordinator: coordinator.clone(),
|
||||
table_sources,
|
||||
function_sources,
|
||||
watch_mode,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new(pool: PostgresPool, config: Config, watch_mode: bool) -> SystemRunner {
|
||||
let server = System::new("server");
|
||||
let sys = actix_rt::System::new("server");
|
||||
|
||||
let db = SyncArbiter::start(3, move || DbExecutor(pool.clone()));
|
||||
let db = SyncArbiter::start(3, move || DBActor(pool.clone()));
|
||||
let coordinator: Addr<_> = CoordinatorActor::default().start();
|
||||
|
||||
let keep_alive = config.keep_alive;
|
||||
let worker_processes = config.worker_processes;
|
||||
let listen_addresses = config.listen_addresses.clone();
|
||||
|
||||
let _addr = server::new(move || {
|
||||
app::new(
|
||||
db.clone(),
|
||||
coordinator.clone(),
|
||||
config.table_sources.clone(),
|
||||
config.function_sources.clone(),
|
||||
watch_mode,
|
||||
)
|
||||
HttpServer::new(move || {
|
||||
let state = create_state(db.clone(), coordinator.clone(), config.clone(), watch_mode);
|
||||
|
||||
let cors_middleware = middleware::DefaultHeaders::new()
|
||||
.header(http::header::ACCESS_CONTROL_ALLOW_ORIGIN, "*");
|
||||
|
||||
App::new()
|
||||
.data(state)
|
||||
.wrap(cors_middleware)
|
||||
.wrap(middleware::NormalizePath)
|
||||
.wrap(middleware::Logger::default())
|
||||
.wrap(middleware::Compress::default())
|
||||
.configure(router)
|
||||
})
|
||||
.bind(listen_addresses.clone())
|
||||
.unwrap_or_else(|_| panic!("Can't bind to {}", listen_addresses))
|
||||
@ -33,5 +361,5 @@ pub fn new(pool: PostgresPool, config: Config, watch_mode: bool) -> SystemRunner
|
||||
.workers(worker_processes)
|
||||
.start();
|
||||
|
||||
server
|
||||
sys
|
||||
}
|
||||
|
@ -1,10 +1,13 @@
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Debug;
|
||||
use std::io;
|
||||
|
||||
use super::app::Query;
|
||||
use super::db::PostgresConnection;
|
||||
use tilejson::TileJSON;
|
||||
|
||||
use crate::db::PostgresConnection;
|
||||
|
||||
pub type Tile = Vec<u8>;
|
||||
pub type Query = HashMap<String, String>;
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
pub struct XYZ {
|
||||
@ -15,11 +18,14 @@ pub struct XYZ {
|
||||
|
||||
pub trait Source: Debug {
|
||||
fn get_id(&self) -> &str;
|
||||
|
||||
fn get_tilejson(&self) -> Result<TileJSON, io::Error>;
|
||||
|
||||
fn get_tile(
|
||||
&self,
|
||||
conn: &PostgresConnection,
|
||||
xyz: &XYZ,
|
||||
query: &Query,
|
||||
query: &Option<Query>,
|
||||
) -> Result<Tile, io::Error>;
|
||||
}
|
||||
|
||||
|
@ -1,11 +1,13 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::error::Error;
|
||||
use std::io;
|
||||
|
||||
use super::app::Query;
|
||||
use super::db::PostgresConnection;
|
||||
use super::source::{Source, Tile, XYZ};
|
||||
use super::utils;
|
||||
use tilejson::{TileJSON, TileJSONBuilder};
|
||||
|
||||
use crate::db::PostgresConnection;
|
||||
use crate::source::{Query, Source, Tile, XYZ};
|
||||
use crate::utils;
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct TableSource {
|
||||
@ -29,11 +31,20 @@ impl Source for TableSource {
|
||||
self.id.as_str()
|
||||
}
|
||||
|
||||
fn get_tilejson(&self) -> Result<TileJSON, io::Error> {
|
||||
let mut tilejson_builder = TileJSONBuilder::new();
|
||||
|
||||
tilejson_builder.scheme("tms");
|
||||
tilejson_builder.name(&self.id);
|
||||
|
||||
Ok(tilejson_builder.finalize())
|
||||
}
|
||||
|
||||
fn get_tile(
|
||||
&self,
|
||||
conn: &PostgresConnection,
|
||||
xyz: &XYZ,
|
||||
_query: &Query,
|
||||
_query: &Option<Query>,
|
||||
) -> Result<Tile, io::Error> {
|
||||
let mercator_bounds = utils::tilebbox(xyz);
|
||||
|
||||
|
63
src/utils.rs
63
src/utils.rs
@ -1,73 +1,12 @@
|
||||
use actix_web::dev::{ConnectionInfo, Params};
|
||||
use actix_web::http::header::{HeaderMap, ToStrError};
|
||||
use serde_json;
|
||||
use std::collections::HashMap;
|
||||
use tilejson::{TileJSON, TileJSONBuilder};
|
||||
|
||||
use super::app::Query;
|
||||
use super::source::{Source, XYZ};
|
||||
use crate::source::{Query, XYZ};
|
||||
|
||||
pub fn prettify_error<E: std::fmt::Display>(message: &'static str) -> impl Fn(E) -> std::io::Error {
|
||||
move |error| std::io::Error::new(std::io::ErrorKind::Other, format!("{}: {}", message, error))
|
||||
}
|
||||
|
||||
pub fn build_tilejson(
|
||||
source: Box<dyn Source>,
|
||||
connection_info: &ConnectionInfo,
|
||||
path: &str,
|
||||
query_string: &str,
|
||||
headers: &HeaderMap,
|
||||
) -> Result<TileJSON, ToStrError> {
|
||||
let source_id = source.get_id();
|
||||
|
||||
let path = headers
|
||||
.get("x-rewrite-url")
|
||||
.map_or(Ok(path.trim_end_matches(".json")), |header| {
|
||||
let header_str = header.to_str()?;
|
||||
Ok(header_str.trim_end_matches(".json"))
|
||||
})?;
|
||||
|
||||
let query = if query_string.is_empty() {
|
||||
query_string.to_owned()
|
||||
} else {
|
||||
format!("?{}", query_string)
|
||||
};
|
||||
|
||||
let tiles_url = format!(
|
||||
"{}://{}{}/{{z}}/{{x}}/{{y}}.pbf{}",
|
||||
connection_info.scheme(),
|
||||
connection_info.host(),
|
||||
path,
|
||||
query
|
||||
);
|
||||
|
||||
let mut tilejson_builder = TileJSONBuilder::new();
|
||||
tilejson_builder.scheme("tms");
|
||||
tilejson_builder.name(source_id);
|
||||
tilejson_builder.tiles(vec![&tiles_url]);
|
||||
|
||||
Ok(tilejson_builder.finalize())
|
||||
}
|
||||
|
||||
pub fn parse_xyz(params: &Params) -> Result<XYZ, &str> {
|
||||
let z = params
|
||||
.get("z")
|
||||
.and_then(|i| i.parse::<u32>().ok())
|
||||
.ok_or("invalid z value")?;
|
||||
|
||||
let x = params
|
||||
.get("x")
|
||||
.and_then(|i| i.parse::<u32>().ok())
|
||||
.ok_or("invalid x value")?;
|
||||
|
||||
let y = params
|
||||
.get("y")
|
||||
.and_then(|i| i.parse::<u32>().ok())
|
||||
.ok_or("invalid y value")?;
|
||||
|
||||
Ok(XYZ { x, y, z })
|
||||
}
|
||||
|
||||
// https://github.com/mapbox/postgis-vt-util/blob/master/src/TileBBox.sql
|
||||
pub fn tilebbox(xyz: &XYZ) -> String {
|
||||
let x = xyz.x;
|
||||
|
@ -1,10 +1,10 @@
|
||||
use actix::prelude::*;
|
||||
use actix::{Actor, Context, Handler};
|
||||
use std::cell::RefCell;
|
||||
use std::rc::Rc;
|
||||
|
||||
use super::function_source::FunctionSources;
|
||||
use super::messages;
|
||||
use super::table_source::TableSources;
|
||||
use crate::function_source::FunctionSources;
|
||||
use crate::messages;
|
||||
use crate::table_source::TableSources;
|
||||
|
||||
pub struct WorkerActor {
|
||||
pub table_sources: Rc<RefCell<Option<TableSources>>>,
|
||||
|
@ -32,28 +32,36 @@
|
||||
<div id="map"></div>
|
||||
|
||||
<script>
|
||||
mapboxgl.accessToken =
|
||||
'pk.eyJ1Ijoic3RlcGFua3V6bWluIiwiYSI6Ik1ieW5udm8ifQ.25EOEC2-N92NCWT0Ci9w-Q';
|
||||
let accessToken = localStorage.getItem('accessToken');
|
||||
|
||||
var map = new mapboxgl.Map({
|
||||
if (!accessToken) {
|
||||
accessToken = window.prompt('Mapbox accessToken');
|
||||
localStorage.setItem('accessToken', accessToken);
|
||||
}
|
||||
|
||||
mapboxgl.accessToken = accessToken;
|
||||
|
||||
const map = new mapboxgl.Map({
|
||||
container: 'map',
|
||||
style: 'mapbox://styles/mapbox/light-v9',
|
||||
zoom: 0,
|
||||
center: [0, 0]
|
||||
});
|
||||
|
||||
const sourceId = 'public.points';
|
||||
|
||||
map.on('load', function() {
|
||||
map.addLayer({
|
||||
id: 'public.points',
|
||||
id: sourceId,
|
||||
type: 'circle',
|
||||
source: {
|
||||
type: 'vector',
|
||||
url: 'http://0.0.0.0:3000/public.points.json'
|
||||
url: `http://0.0.0.0:3000/${sourceId}.json`
|
||||
},
|
||||
'source-layer': 'public.points'
|
||||
'source-layer': sourceId
|
||||
});
|
||||
|
||||
map.on('click', 'public.points', function(event) {
|
||||
map.on('click', sourceId, function(event) {
|
||||
console.log(event.features);
|
||||
});
|
||||
});
|
||||
|
7
tests/fixtures/function_source.sql
vendored
7
tests/fixtures/function_source.sql
vendored
@ -1,16 +1,15 @@
|
||||
DROP FUNCTION IF EXISTS public.function_source;
|
||||
CREATE OR REPLACE FUNCTION public.function_source(z integer, x integer, y integer, query_params json) RETURNS bytea AS $$
|
||||
DECLARE
|
||||
bounds geometry;
|
||||
mvt bytea;
|
||||
BEGIN
|
||||
SELECT INTO bounds TileBBox(z, x, y, 3857);
|
||||
RAISE NOTICE 'query_params: %', query_params;
|
||||
|
||||
SELECT INTO mvt ST_AsMVT(tile, 'public.function_source', 4096, 'geom') FROM (
|
||||
SELECT
|
||||
ST_AsMVTGeom(geom, bounds, 4096, 64, true) AS geom
|
||||
ST_AsMVTGeom(ST_Transform(geom, 3857), TileBBox(z, x, y, 3857), 4096, 64, true) AS geom
|
||||
FROM public.table_source
|
||||
WHERE geom && bounds
|
||||
WHERE geom && TileBBox(z, x, y, 4326)
|
||||
) as tile WHERE geom IS NOT NULL;
|
||||
|
||||
RETURN mvt;
|
||||
|
107
tests/server_test.rs
Normal file
107
tests/server_test.rs
Normal file
@ -0,0 +1,107 @@
|
||||
extern crate log;
|
||||
|
||||
use actix_web::dev::Service;
|
||||
use actix_web::{http, test, App};
|
||||
|
||||
use martin::dev::{mock_function_sources, mock_state, mock_table_sources};
|
||||
use martin::function_source::FunctionSources;
|
||||
use martin::server::router;
|
||||
use martin::table_source::TableSources;
|
||||
|
||||
#[test]
|
||||
fn test_get_table_sources_ok() {
|
||||
let state = test::run_on(|| mock_state(mock_table_sources(), None));
|
||||
let mut app = test::init_service(App::new().data(state).configure(router));
|
||||
|
||||
let req = test::TestRequest::get().uri("/index.json").to_request();
|
||||
|
||||
let response = test::block_on(app.call(req)).unwrap();
|
||||
assert!(response.status().is_success());
|
||||
|
||||
let body = test::read_body(response);
|
||||
let table_sources: TableSources = serde_json::from_slice(&body).unwrap();
|
||||
assert!(table_sources.contains_key("public.table_source"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_table_source_ok() {
|
||||
let state = test::run_on(|| mock_state(mock_table_sources(), None));
|
||||
let mut app = test::init_service(App::new().data(state).configure(router));
|
||||
|
||||
let req = test::TestRequest::get()
|
||||
.uri("/public.non_existant.json")
|
||||
.to_request();
|
||||
|
||||
let response = test::block_on(app.call(req)).unwrap();
|
||||
assert_eq!(response.status(), http::StatusCode::NOT_FOUND);
|
||||
|
||||
let req = test::TestRequest::get()
|
||||
.uri("/public.table_source.json")
|
||||
.to_request();
|
||||
|
||||
let response = test::block_on(app.call(req)).unwrap();
|
||||
assert!(response.status().is_success());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_table_source_tile_ok() {
|
||||
let state = test::run_on(|| mock_state(mock_table_sources(), None));
|
||||
let mut app = test::init_service(App::new().data(state).configure(router));
|
||||
|
||||
let req = test::TestRequest::get()
|
||||
.uri("/public.table_source/0/0/0.pbf")
|
||||
.to_request();
|
||||
|
||||
let future = test::run_on(|| app.call(req));
|
||||
let response = test::block_on(future).unwrap();
|
||||
assert!(response.status().is_success());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_function_sources_ok() {
|
||||
let state = test::run_on(|| mock_state(None, mock_function_sources()));
|
||||
let mut app = test::init_service(App::new().data(state).configure(router));
|
||||
|
||||
let req = test::TestRequest::get().uri("/rpc/index.json").to_request();
|
||||
|
||||
let response = test::block_on(app.call(req)).unwrap();
|
||||
assert!(response.status().is_success());
|
||||
|
||||
let body = test::read_body(response);
|
||||
let function_sources: FunctionSources = serde_json::from_slice(&body).unwrap();
|
||||
assert!(function_sources.contains_key("public.function_source"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_function_source_ok() {
|
||||
let state = test::run_on(|| mock_state(None, mock_function_sources()));
|
||||
let mut app = test::init_service(App::new().data(state).configure(router));
|
||||
|
||||
let req = test::TestRequest::get()
|
||||
.uri("/rpc/public.non_existant.json")
|
||||
.to_request();
|
||||
|
||||
let response = test::block_on(app.call(req)).unwrap();
|
||||
assert_eq!(response.status(), http::StatusCode::NOT_FOUND);
|
||||
|
||||
let req = test::TestRequest::get()
|
||||
.uri("/rpc/public.function_source.json")
|
||||
.to_request();
|
||||
|
||||
let response = test::block_on(app.call(req)).unwrap();
|
||||
assert!(response.status().is_success());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_function_source_tile_ok() {
|
||||
let state = test::run_on(|| mock_state(None, mock_function_sources()));
|
||||
let mut app = test::init_service(App::new().data(state).configure(router));
|
||||
|
||||
let req = test::TestRequest::get()
|
||||
.uri("/rpc/public.function_source/0/0/0.pbf")
|
||||
.to_request();
|
||||
|
||||
let future = test::run_on(|| app.call(req));
|
||||
let response = test::block_on(future).unwrap();
|
||||
assert!(response.status().is_success());
|
||||
}
|
Loading…
Reference in New Issue
Block a user