feat: 🎸 dynamic update of sources #12

Table and Function sources can now be refreshed when corresponding list
endpoints are requested
This commit is contained in:
Stepan Kuzmin 2018-11-28 16:13:31 +03:00
parent 1dae41abd8
commit 5f8936459e
7 changed files with 287 additions and 49 deletions

View File

@ -1,40 +1,59 @@
use actix::*; use actix::*;
use actix_web::*; use actix_web::*;
use futures::future::Future; use futures::future::Future;
use std::cell::RefCell;
use std::collections::HashMap; use std::collections::HashMap;
use std::rc::Rc;
use super::coordinator_actor::CoordinatorActor;
use super::db_executor::DbExecutor; use super::db_executor::DbExecutor;
use super::function_source::FunctionSources; use super::function_source::FunctionSources;
use super::messages; use super::messages;
use super::table_source::TableSources; use super::table_source::TableSources;
use super::utils::{build_tilejson, parse_xyz}; use super::utils::{build_tilejson, parse_xyz};
use super::worker_actor::WorkerActor;
pub type Query = HashMap<String, String>; pub type Query = HashMap<String, String>;
pub struct State { pub struct State {
db: Addr<DbExecutor>, db: Addr<DbExecutor>,
table_sources: Option<TableSources>, coordinator: Addr<CoordinatorActor>,
function_sources: Option<FunctionSources>, table_sources: Rc<RefCell<Option<TableSources>>>,
function_sources: Rc<RefCell<Option<FunctionSources>>>,
} }
// TODO: Swagger endpoint fn get_table_sources(
fn get_table_sources(req: &HttpRequest<State>) -> Result<HttpResponse> { req: &HttpRequest<State>,
) -> Result<Box<Future<Item = HttpResponse, Error = Error>>> {
let state = &req.state(); let state = &req.state();
let table_sources = state let coordinator = state.coordinator.clone();
.table_sources
.clone() let result = req.state().db.send(messages::GetTableSources {});
.ok_or_else(|| error::ErrorNotFound("There is no table sources"))?;
let response = result
.from_err()
.and_then(move |res| match res {
Ok(table_sources) => {
coordinator.do_send(messages::RefreshTableSources {
table_sources: Some(table_sources.clone()),
});
Ok(HttpResponse::Ok() Ok(HttpResponse::Ok()
.header("Access-Control-Allow-Origin", "*") .header("Access-Control-Allow-Origin", "*")
.json(table_sources)) .json(table_sources))
} }
Err(_) => Ok(HttpResponse::InternalServerError().into()),
}).responder();
Ok(response)
}
// TODO: add properties to TileJSON endpoint
fn get_table_source(req: &HttpRequest<State>) -> Result<HttpResponse> { fn get_table_source(req: &HttpRequest<State>) -> Result<HttpResponse> {
let state = &req.state(); let state = &req.state();
let table_sources = state let table_sources = state
.table_sources .table_sources
.borrow()
.clone() .clone()
.ok_or_else(|| error::ErrorNotFound("There is no table sources"))?; .ok_or_else(|| error::ErrorNotFound("There is no table sources"))?;
@ -64,8 +83,10 @@ fn get_table_source_tile(
req: &HttpRequest<State>, req: &HttpRequest<State>,
) -> Result<Box<Future<Item = HttpResponse, Error = Error>>> { ) -> Result<Box<Future<Item = HttpResponse, Error = Error>>> {
let state = &req.state(); let state = &req.state();
let table_sources = state let table_sources = state
.table_sources .table_sources
.borrow()
.clone() .clone()
.ok_or_else(|| error::ErrorNotFound("There is no table sources"))?; .ok_or_else(|| error::ErrorNotFound("There is no table sources"))?;
@ -83,14 +104,16 @@ fn get_table_source_tile(
let query = req.query(); let query = req.query();
Ok(req let message = messages::GetTile {
.state()
.db
.send(messages::GetTile {
xyz, xyz,
query: query.clone(), query: query.clone(),
source: source.clone(), source: source.clone(),
}).from_err() };
let result = req.state().db.send(message);
let response = result
.from_err()
.and_then(|res| match res { .and_then(|res| match res {
Ok(tile) => match tile.len() { Ok(tile) => match tile.len() {
0 => Ok(HttpResponse::NoContent() 0 => Ok(HttpResponse::NoContent()
@ -103,25 +126,41 @@ fn get_table_source_tile(
.body(tile)), .body(tile)),
}, },
Err(_) => Ok(HttpResponse::InternalServerError().into()), Err(_) => Ok(HttpResponse::InternalServerError().into()),
}).responder()) }).responder();
Ok(response)
} }
fn get_function_sources(req: &HttpRequest<State>) -> Result<HttpResponse> { fn get_function_sources(
req: &HttpRequest<State>,
) -> Result<Box<Future<Item = HttpResponse, Error = Error>>> {
let state = &req.state(); let state = &req.state();
let function_sources = state let coordinator = state.coordinator.clone();
.function_sources
.clone() let result = req.state().db.send(messages::GetFunctionSources {});
.ok_or_else(|| error::ErrorNotFound("There is no function sources"))?; let response = result
.from_err()
.and_then(move |res| match res {
Ok(function_sources) => {
coordinator.do_send(messages::RefreshFunctionSources {
function_sources: Some(function_sources.clone()),
});
Ok(HttpResponse::Ok() Ok(HttpResponse::Ok()
.header("Access-Control-Allow-Origin", "*") .header("Access-Control-Allow-Origin", "*")
.json(function_sources)) .json(function_sources))
} }
Err(_) => Ok(HttpResponse::InternalServerError().into()),
}).responder();
Ok(response)
}
fn get_function_source(req: &HttpRequest<State>) -> Result<HttpResponse> { fn get_function_source(req: &HttpRequest<State>) -> Result<HttpResponse> {
let state = &req.state(); let state = &req.state();
let function_sources = state let function_sources = state
.function_sources .function_sources
.borrow()
.clone() .clone()
.ok_or_else(|| error::ErrorNotFound("There is no function sources"))?; .ok_or_else(|| error::ErrorNotFound("There is no function sources"))?;
@ -153,6 +192,7 @@ fn get_function_source_tile(
let state = &req.state(); let state = &req.state();
let function_sources = state let function_sources = state
.function_sources .function_sources
.borrow()
.clone() .clone()
.ok_or_else(|| error::ErrorNotFound("There is no function sources"))?; .ok_or_else(|| error::ErrorNotFound("There is no function sources"))?;
@ -170,14 +210,16 @@ fn get_function_source_tile(
let query = req.query(); let query = req.query();
Ok(req let message = messages::GetTile {
.state()
.db
.send(messages::GetTile {
xyz, xyz,
query: query.clone(), query: query.clone(),
source: source.clone(), source: source.clone(),
}).from_err() };
let result = req.state().db.send(message);
let response = result
.from_err()
.and_then(|res| match res { .and_then(|res| match res {
Ok(tile) => match tile.len() { Ok(tile) => match tile.len() {
0 => Ok(HttpResponse::NoContent() 0 => Ok(HttpResponse::NoContent()
@ -190,18 +232,33 @@ fn get_function_source_tile(
.body(tile)), .body(tile)),
}, },
Err(_) => Ok(HttpResponse::InternalServerError().into()), Err(_) => Ok(HttpResponse::InternalServerError().into()),
}).responder()) }).responder();
Ok(response)
} }
pub fn new( pub fn new(
db_sync_arbiter: Addr<DbExecutor>, db: Addr<DbExecutor>,
coordinator: Addr<CoordinatorActor>,
table_sources: Option<TableSources>, table_sources: Option<TableSources>,
function_sources: Option<FunctionSources>, function_sources: Option<FunctionSources>,
) -> App<State> { ) -> App<State> {
let table_sources_rc = Rc::new(RefCell::new(table_sources));
let function_sources_rc = Rc::new(RefCell::new(function_sources));
let worker_actor = WorkerActor {
table_sources: table_sources_rc.clone(),
function_sources: function_sources_rc.clone(),
};
let worker: Addr<_> = worker_actor.start();
coordinator.do_send(messages::Connect { addr: worker });
let state = State { let state = State {
db: db_sync_arbiter, db,
table_sources, coordinator,
function_sources, table_sources: table_sources_rc.clone(),
function_sources: function_sources_rc.clone(),
}; };
App::with_state(state) App::with_state(state)
@ -242,12 +299,26 @@ mod tests {
test::TestServer::build_with_state(move || { test::TestServer::build_with_state(move || {
let conn_string: String = env::var("DATABASE_URL").unwrap(); let conn_string: String = env::var("DATABASE_URL").unwrap();
let pool = setup_connection_pool(&conn_string, None).unwrap(); let pool = setup_connection_pool(&conn_string, None).unwrap();
let db_sync_arbiter = SyncArbiter::start(3, move || DbExecutor(pool.clone())); let db = SyncArbiter::start(3, move || DbExecutor(pool.clone()));
let table_sources_rc = Rc::new(RefCell::new(table_sources.clone()));
let function_sources_rc = Rc::new(RefCell::new(function_sources.clone()));
let coordinator: Addr<_> = CoordinatorActor::default().start();
let worker_actor = WorkerActor {
table_sources: table_sources_rc.clone(),
function_sources: function_sources_rc.clone(),
};
let worker: Addr<_> = worker_actor.start();
coordinator.do_send(messages::Connect { addr: worker });
State { State {
db: db_sync_arbiter, db,
table_sources: table_sources.clone(), coordinator,
function_sources: function_sources.clone(), table_sources: table_sources_rc.clone(),
function_sources: function_sources_rc.clone(),
} }
}).start(|app| { }).start(|app| {
app.resource("/index.json", |r| { app.resource("/index.json", |r| {

62
src/coordinator_actor.rs Normal file
View File

@ -0,0 +1,62 @@
use actix::prelude::*;
use super::messages;
use super::worker_actor::WorkerActor;
pub struct CoordinatorActor {
workers: Vec<Addr<WorkerActor>>,
}
impl Default for CoordinatorActor {
fn default() -> CoordinatorActor {
CoordinatorActor { workers: vec![] }
}
}
impl Actor for CoordinatorActor {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
info!("Starting CoordinatorActor");
}
}
impl Handler<messages::Connect> for CoordinatorActor {
type Result = Addr<WorkerActor>;
fn handle(&mut self, msg: messages::Connect, _: &mut Context<Self>) -> Self::Result {
info!("WorkerActor connected");
self.workers.push(msg.addr.clone());
msg.addr
}
}
impl Handler<messages::RefreshTableSources> for CoordinatorActor {
type Result = ();
fn handle(&mut self, msg: messages::RefreshTableSources, _: &mut Context<Self>) -> Self::Result {
for worker in &self.workers {
let message = messages::RefreshTableSources {
table_sources: msg.table_sources.clone(),
};
worker.do_send(message);
}
}
}
impl Handler<messages::RefreshFunctionSources> for CoordinatorActor {
type Result = ();
fn handle(
&mut self,
msg: messages::RefreshFunctionSources,
_: &mut Context<Self>,
) -> Self::Result {
for worker in &self.workers {
let message = messages::RefreshFunctionSources {
function_sources: msg.function_sources.clone(),
};
worker.do_send(message);
}
}
}

View File

@ -2,8 +2,10 @@ use actix::prelude::*;
use std::io; use std::io;
use super::db::PostgresPool; use super::db::PostgresPool;
use super::function_source::{get_function_sources, FunctionSources};
use super::messages; use super::messages;
use super::source::Tile; use super::source::Tile;
use super::table_source::{get_table_sources, TableSources};
pub struct DbExecutor(pub PostgresPool); pub struct DbExecutor(pub PostgresPool);
@ -11,6 +13,26 @@ impl Actor for DbExecutor {
type Context = SyncContext<Self>; type Context = SyncContext<Self>;
} }
impl Handler<messages::GetTableSources> for DbExecutor {
type Result = Result<TableSources, io::Error>;
fn handle(&mut self, _msg: messages::GetTableSources, _: &mut Self::Context) -> Self::Result {
let conn = self.0.get().unwrap();
let table_sources = get_table_sources(&conn)?;
Ok(table_sources)
}
}
impl Handler<messages::GetFunctionSources> for DbExecutor {
type Result = Result<FunctionSources, io::Error>;
fn handle(&mut self, _msg: messages::GetFunctionSources, _: &mut Self::Context) -> Self::Result {
let conn = self.0.get().unwrap();
let function_sources = get_function_sources(&conn)?;
Ok(function_sources)
}
}
impl Handler<messages::GetTile> for DbExecutor { impl Handler<messages::GetTile> for DbExecutor {
type Result = Result<Tile, io::Error>; type Result = Result<Tile, io::Error>;

View File

@ -22,6 +22,7 @@ extern crate serde_yaml;
mod app; mod app;
mod cli; mod cli;
mod config; mod config;
mod coordinator_actor;
mod db; mod db;
mod db_executor; mod db_executor;
mod function_source; mod function_source;
@ -30,6 +31,7 @@ mod server;
mod source; mod source;
mod table_source; mod table_source;
mod utils; mod utils;
mod worker_actor;
use docopt::Docopt; use docopt::Docopt;
use semver::Version; use semver::Version;

View File

@ -2,7 +2,18 @@ use actix::prelude::*;
use std::io; use std::io;
use super::app::Query; use super::app::Query;
use super::function_source::FunctionSources;
use super::source::{Source, Tile, XYZ}; use super::source::{Source, Tile, XYZ};
use super::table_source::TableSources;
use super::worker_actor::WorkerActor;
pub struct Connect {
pub addr: Addr<WorkerActor>,
}
impl Message for Connect {
type Result = Addr<WorkerActor>;
}
pub struct GetTile { pub struct GetTile {
pub xyz: XYZ, pub xyz: XYZ,
@ -13,3 +24,29 @@ pub struct GetTile {
impl Message for GetTile { impl Message for GetTile {
type Result = Result<Tile, io::Error>; type Result = Result<Tile, io::Error>;
} }
pub struct GetTableSources {}
impl Message for GetTableSources {
type Result = Result<TableSources, io::Error>;
}
pub struct GetFunctionSources {}
impl Message for GetFunctionSources {
type Result = Result<FunctionSources, io::Error>;
}
pub struct RefreshTableSources {
pub table_sources: Option<TableSources>,
}
impl Message for RefreshTableSources {
type Result = ();
}
pub struct RefreshFunctionSources {
pub function_sources: Option<FunctionSources>,
}
impl Message for RefreshFunctionSources {
type Result = ();
}

View File

@ -1,14 +1,17 @@
use actix::{SyncArbiter, System, SystemRunner}; use actix::{Actor, Addr, SyncArbiter, System, SystemRunner};
use actix_web::server; use actix_web::server;
use super::app; use super::app;
use super::config::Config; use super::config::Config;
use super::coordinator_actor::CoordinatorActor;
use super::db::PostgresPool; use super::db::PostgresPool;
use super::db_executor::DbExecutor; use super::db_executor::DbExecutor;
pub fn new(config: Config, pool: PostgresPool) -> SystemRunner { pub fn new(config: Config, pool: PostgresPool) -> SystemRunner {
let server = System::new("server"); let server = System::new("server");
let db_sync_arbiter = SyncArbiter::start(3, move || DbExecutor(pool.clone()));
let db = SyncArbiter::start(3, move || DbExecutor(pool.clone()));
let coordinator: Addr<_> = CoordinatorActor::default().start();
let keep_alive = config.keep_alive; let keep_alive = config.keep_alive;
let worker_processes = config.worker_processes; let worker_processes = config.worker_processes;
@ -16,7 +19,8 @@ pub fn new(config: Config, pool: PostgresPool) -> SystemRunner {
let _addr = server::new(move || { let _addr = server::new(move || {
app::new( app::new(
db_sync_arbiter.clone(), db.clone(),
coordinator.clone(),
config.table_sources.clone(), config.table_sources.clone(),
config.function_sources.clone(), config.function_sources.clone(),
) )

40
src/worker_actor.rs Normal file
View File

@ -0,0 +1,40 @@
use actix::prelude::*;
use std::cell::RefCell;
use std::rc::Rc;
use super::function_source::FunctionSources;
use super::messages;
use super::table_source::TableSources;
pub struct WorkerActor {
pub table_sources: Rc<RefCell<Option<TableSources>>>,
pub function_sources: Rc<RefCell<Option<FunctionSources>>>,
}
impl Actor for WorkerActor {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
info!("Starting WorkerActor");
}
}
impl Handler<messages::RefreshTableSources> for WorkerActor {
type Result = ();
fn handle(&mut self, msg: messages::RefreshTableSources, _: &mut Context<Self>) -> Self::Result {
*self.table_sources.borrow_mut() = msg.table_sources.clone();
}
}
impl Handler<messages::RefreshFunctionSources> for WorkerActor {
type Result = ();
fn handle(
&mut self,
msg: messages::RefreshFunctionSources,
_: &mut Context<Self>,
) -> Self::Result {
*self.function_sources.borrow_mut() = msg.function_sources.clone();
}
}