diff --git a/src/app.rs b/src/app.rs index 76eca8f0..5d46166b 100755 --- a/src/app.rs +++ b/src/app.rs @@ -1,40 +1,59 @@ use actix::*; use actix_web::*; use futures::future::Future; +use std::cell::RefCell; use std::collections::HashMap; +use std::rc::Rc; +use super::coordinator_actor::CoordinatorActor; use super::db_executor::DbExecutor; use super::function_source::FunctionSources; use super::messages; use super::table_source::TableSources; use super::utils::{build_tilejson, parse_xyz}; +use super::worker_actor::WorkerActor; pub type Query = HashMap; pub struct State { db: Addr, - table_sources: Option, - function_sources: Option, + coordinator: Addr, + table_sources: Rc>>, + function_sources: Rc>>, } -// TODO: Swagger endpoint -fn get_table_sources(req: &HttpRequest) -> Result { +fn get_table_sources( + req: &HttpRequest, +) -> Result>> { let state = &req.state(); - let table_sources = state - .table_sources - .clone() - .ok_or_else(|| error::ErrorNotFound("There is no table sources"))?; + let coordinator = state.coordinator.clone(); - Ok(HttpResponse::Ok() - .header("Access-Control-Allow-Origin", "*") - .json(table_sources)) + let result = req.state().db.send(messages::GetTableSources {}); + + let response = result + .from_err() + .and_then(move |res| match res { + Ok(table_sources) => { + coordinator.do_send(messages::RefreshTableSources { + table_sources: Some(table_sources.clone()), + }); + + Ok(HttpResponse::Ok() + .header("Access-Control-Allow-Origin", "*") + .json(table_sources)) + } + Err(_) => Ok(HttpResponse::InternalServerError().into()), + }).responder(); + + Ok(response) } -// TODO: add properties to TileJSON endpoint fn get_table_source(req: &HttpRequest) -> Result { let state = &req.state(); + let table_sources = state .table_sources + .borrow() .clone() .ok_or_else(|| error::ErrorNotFound("There is no table sources"))?; @@ -64,8 +83,10 @@ fn get_table_source_tile( req: &HttpRequest, ) -> Result>> { let state = &req.state(); + let table_sources = state .table_sources + .borrow() .clone() .ok_or_else(|| error::ErrorNotFound("There is no table sources"))?; @@ -83,14 +104,16 @@ fn get_table_source_tile( let query = req.query(); - Ok(req - .state() - .db - .send(messages::GetTile { - xyz, - query: query.clone(), - source: source.clone(), - }).from_err() + let message = messages::GetTile { + xyz, + query: query.clone(), + source: source.clone(), + }; + + let result = req.state().db.send(message); + + let response = result + .from_err() .and_then(|res| match res { Ok(tile) => match tile.len() { 0 => Ok(HttpResponse::NoContent() @@ -103,25 +126,41 @@ fn get_table_source_tile( .body(tile)), }, Err(_) => Ok(HttpResponse::InternalServerError().into()), - }).responder()) + }).responder(); + + Ok(response) } -fn get_function_sources(req: &HttpRequest) -> Result { +fn get_function_sources( + req: &HttpRequest, +) -> Result>> { let state = &req.state(); - let function_sources = state - .function_sources - .clone() - .ok_or_else(|| error::ErrorNotFound("There is no function sources"))?; + let coordinator = state.coordinator.clone(); - Ok(HttpResponse::Ok() - .header("Access-Control-Allow-Origin", "*") - .json(function_sources)) + let result = req.state().db.send(messages::GetFunctionSources {}); + let response = result + .from_err() + .and_then(move |res| match res { + Ok(function_sources) => { + coordinator.do_send(messages::RefreshFunctionSources { + function_sources: Some(function_sources.clone()), + }); + + Ok(HttpResponse::Ok() + .header("Access-Control-Allow-Origin", "*") + .json(function_sources)) + } + Err(_) => Ok(HttpResponse::InternalServerError().into()), + }).responder(); + + Ok(response) } fn get_function_source(req: &HttpRequest) -> Result { let state = &req.state(); let function_sources = state .function_sources + .borrow() .clone() .ok_or_else(|| error::ErrorNotFound("There is no function sources"))?; @@ -153,6 +192,7 @@ fn get_function_source_tile( let state = &req.state(); let function_sources = state .function_sources + .borrow() .clone() .ok_or_else(|| error::ErrorNotFound("There is no function sources"))?; @@ -170,14 +210,16 @@ fn get_function_source_tile( let query = req.query(); - Ok(req - .state() - .db - .send(messages::GetTile { - xyz, - query: query.clone(), - source: source.clone(), - }).from_err() + let message = messages::GetTile { + xyz, + query: query.clone(), + source: source.clone(), + }; + + let result = req.state().db.send(message); + + let response = result + .from_err() .and_then(|res| match res { Ok(tile) => match tile.len() { 0 => Ok(HttpResponse::NoContent() @@ -190,18 +232,33 @@ fn get_function_source_tile( .body(tile)), }, Err(_) => Ok(HttpResponse::InternalServerError().into()), - }).responder()) + }).responder(); + + Ok(response) } pub fn new( - db_sync_arbiter: Addr, + db: Addr, + coordinator: Addr, table_sources: Option, function_sources: Option, ) -> App { + let table_sources_rc = Rc::new(RefCell::new(table_sources)); + let function_sources_rc = Rc::new(RefCell::new(function_sources)); + + let worker_actor = WorkerActor { + table_sources: table_sources_rc.clone(), + function_sources: function_sources_rc.clone(), + }; + + let worker: Addr<_> = worker_actor.start(); + coordinator.do_send(messages::Connect { addr: worker }); + let state = State { - db: db_sync_arbiter, - table_sources, - function_sources, + db, + coordinator, + table_sources: table_sources_rc.clone(), + function_sources: function_sources_rc.clone(), }; App::with_state(state) @@ -242,12 +299,26 @@ mod tests { test::TestServer::build_with_state(move || { let conn_string: String = env::var("DATABASE_URL").unwrap(); let pool = setup_connection_pool(&conn_string, None).unwrap(); - let db_sync_arbiter = SyncArbiter::start(3, move || DbExecutor(pool.clone())); + let db = SyncArbiter::start(3, move || DbExecutor(pool.clone())); + + let table_sources_rc = Rc::new(RefCell::new(table_sources.clone())); + let function_sources_rc = Rc::new(RefCell::new(function_sources.clone())); + + let coordinator: Addr<_> = CoordinatorActor::default().start(); + + let worker_actor = WorkerActor { + table_sources: table_sources_rc.clone(), + function_sources: function_sources_rc.clone(), + }; + + let worker: Addr<_> = worker_actor.start(); + coordinator.do_send(messages::Connect { addr: worker }); State { - db: db_sync_arbiter, - table_sources: table_sources.clone(), - function_sources: function_sources.clone(), + db, + coordinator, + table_sources: table_sources_rc.clone(), + function_sources: function_sources_rc.clone(), } }).start(|app| { app.resource("/index.json", |r| { diff --git a/src/coordinator_actor.rs b/src/coordinator_actor.rs new file mode 100644 index 00000000..4c36177e --- /dev/null +++ b/src/coordinator_actor.rs @@ -0,0 +1,62 @@ +use actix::prelude::*; + +use super::messages; +use super::worker_actor::WorkerActor; + +pub struct CoordinatorActor { + workers: Vec>, +} + +impl Default for CoordinatorActor { + fn default() -> CoordinatorActor { + CoordinatorActor { workers: vec![] } + } +} + +impl Actor for CoordinatorActor { + type Context = Context; + + fn started(&mut self, _ctx: &mut Self::Context) { + info!("Starting CoordinatorActor"); + } +} + +impl Handler for CoordinatorActor { + type Result = Addr; + + fn handle(&mut self, msg: messages::Connect, _: &mut Context) -> Self::Result { + info!("WorkerActor connected"); + self.workers.push(msg.addr.clone()); + msg.addr + } +} + +impl Handler for CoordinatorActor { + type Result = (); + + fn handle(&mut self, msg: messages::RefreshTableSources, _: &mut Context) -> Self::Result { + for worker in &self.workers { + let message = messages::RefreshTableSources { + table_sources: msg.table_sources.clone(), + }; + worker.do_send(message); + } + } +} + +impl Handler for CoordinatorActor { + type Result = (); + + fn handle( + &mut self, + msg: messages::RefreshFunctionSources, + _: &mut Context, + ) -> Self::Result { + for worker in &self.workers { + let message = messages::RefreshFunctionSources { + function_sources: msg.function_sources.clone(), + }; + worker.do_send(message); + } + } +} diff --git a/src/db_executor.rs b/src/db_executor.rs index 975bc0a5..18d5c537 100644 --- a/src/db_executor.rs +++ b/src/db_executor.rs @@ -2,8 +2,10 @@ use actix::prelude::*; use std::io; use super::db::PostgresPool; +use super::function_source::{get_function_sources, FunctionSources}; use super::messages; use super::source::Tile; +use super::table_source::{get_table_sources, TableSources}; pub struct DbExecutor(pub PostgresPool); @@ -11,6 +13,26 @@ impl Actor for DbExecutor { type Context = SyncContext; } +impl Handler for DbExecutor { + type Result = Result; + + fn handle(&mut self, _msg: messages::GetTableSources, _: &mut Self::Context) -> Self::Result { + let conn = self.0.get().unwrap(); + let table_sources = get_table_sources(&conn)?; + Ok(table_sources) + } +} + +impl Handler for DbExecutor { + type Result = Result; + + fn handle(&mut self, _msg: messages::GetFunctionSources, _: &mut Self::Context) -> Self::Result { + let conn = self.0.get().unwrap(); + let function_sources = get_function_sources(&conn)?; + Ok(function_sources) + } +} + impl Handler for DbExecutor { type Result = Result; diff --git a/src/main.rs b/src/main.rs index abec7ed7..7178a0b7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,6 +22,7 @@ extern crate serde_yaml; mod app; mod cli; mod config; +mod coordinator_actor; mod db; mod db_executor; mod function_source; @@ -30,6 +31,7 @@ mod server; mod source; mod table_source; mod utils; +mod worker_actor; use docopt::Docopt; use semver::Version; diff --git a/src/messages.rs b/src/messages.rs index 8eb3d0c6..5114c23f 100755 --- a/src/messages.rs +++ b/src/messages.rs @@ -2,7 +2,18 @@ use actix::prelude::*; use std::io; use super::app::Query; +use super::function_source::FunctionSources; use super::source::{Source, Tile, XYZ}; +use super::table_source::TableSources; +use super::worker_actor::WorkerActor; + +pub struct Connect { + pub addr: Addr, +} + +impl Message for Connect { + type Result = Addr; +} pub struct GetTile { pub xyz: XYZ, @@ -13,3 +24,29 @@ pub struct GetTile { impl Message for GetTile { type Result = Result; } + +pub struct GetTableSources {} +impl Message for GetTableSources { + type Result = Result; +} + +pub struct GetFunctionSources {} +impl Message for GetFunctionSources { + type Result = Result; +} + +pub struct RefreshTableSources { + pub table_sources: Option, +} + +impl Message for RefreshTableSources { + type Result = (); +} + +pub struct RefreshFunctionSources { + pub function_sources: Option, +} + +impl Message for RefreshFunctionSources { + type Result = (); +} diff --git a/src/server.rs b/src/server.rs index 2287e89a..83529e75 100755 --- a/src/server.rs +++ b/src/server.rs @@ -1,14 +1,17 @@ -use actix::{SyncArbiter, System, SystemRunner}; +use actix::{Actor, Addr, SyncArbiter, System, SystemRunner}; use actix_web::server; use super::app; use super::config::Config; +use super::coordinator_actor::CoordinatorActor; use super::db::PostgresPool; use super::db_executor::DbExecutor; pub fn new(config: Config, pool: PostgresPool) -> SystemRunner { let server = System::new("server"); - let db_sync_arbiter = SyncArbiter::start(3, move || DbExecutor(pool.clone())); + + let db = SyncArbiter::start(3, move || DbExecutor(pool.clone())); + let coordinator: Addr<_> = CoordinatorActor::default().start(); let keep_alive = config.keep_alive; let worker_processes = config.worker_processes; @@ -16,7 +19,8 @@ pub fn new(config: Config, pool: PostgresPool) -> SystemRunner { let _addr = server::new(move || { app::new( - db_sync_arbiter.clone(), + db.clone(), + coordinator.clone(), config.table_sources.clone(), config.function_sources.clone(), ) diff --git a/src/worker_actor.rs b/src/worker_actor.rs new file mode 100644 index 00000000..7bd8ddae --- /dev/null +++ b/src/worker_actor.rs @@ -0,0 +1,40 @@ +use actix::prelude::*; +use std::cell::RefCell; +use std::rc::Rc; + +use super::function_source::FunctionSources; +use super::messages; +use super::table_source::TableSources; + +pub struct WorkerActor { + pub table_sources: Rc>>, + pub function_sources: Rc>>, +} + +impl Actor for WorkerActor { + type Context = Context; + + fn started(&mut self, _ctx: &mut Self::Context) { + info!("Starting WorkerActor"); + } +} + +impl Handler for WorkerActor { + type Result = (); + + fn handle(&mut self, msg: messages::RefreshTableSources, _: &mut Context) -> Self::Result { + *self.table_sources.borrow_mut() = msg.table_sources.clone(); + } +} + +impl Handler for WorkerActor { + type Result = (); + + fn handle( + &mut self, + msg: messages::RefreshFunctionSources, + _: &mut Context, + ) -> Self::Result { + *self.function_sources.borrow_mut() = msg.function_sources.clone(); + } +}