WIP: add coordinator

This commit is contained in:
Stepan Kuzmin 2018-03-27 14:40:33 +03:00
parent e2755187f9
commit 86c12064db
3 changed files with 42 additions and 6 deletions

View File

@ -17,6 +17,13 @@ PostGIS [Mapbox Vector Tiles](https://github.com/mapbox/vector-tile-spec) server
DATABASE_URL=postgres://postgres:password@localhost:5432/test martin
## Environment variables
DATABASE_URL
DATABASE_POOL_SIZE
WORKER_PROCESSES
KEEP_ALIVE
## Using with Docker
docker run -d —rm —name martin \

12
src/coordinator.rs Normal file
View File

@ -0,0 +1,12 @@
use actix::{msgs, Actor, Arbiter, Context};
pub struct CoordinatorActor;
impl Actor for CoordinatorActor {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
info!("CoordinatorActor is alive!");
Arbiter::system().do_send(msgs::SystemExit(0));
}
}

View File

@ -12,7 +12,7 @@ extern crate serde_derive;
extern crate serde_json;
use actix_web::HttpServer;
use actix::SyncArbiter;
use actix::{Actor, Addr, Syn, SyncArbiter};
use std::env;
use std::error::Error;
use std::io;
@ -21,18 +21,29 @@ mod db;
mod utils;
mod martin;
mod source;
// mod coordinator;
fn main() {
env_logger::init();
let conn_string: String = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
let pool_size = env::var("POOL_SIZE")
let pool_size = env::var("DATABASE_POOL_SIZE")
.ok()
.and_then(|pool_size| pool_size.parse::<u32>().ok())
.unwrap_or(20);
info!("Connecting to {} with pool size {}", conn_string, pool_size);
let worker_processes = env::var("WORKER_PROCESSES")
.ok()
.and_then(|worker_processes| worker_processes.parse::<usize>().ok())
.unwrap_or(4);
let keep_alive = env::var("KEEP_ALIVE")
.ok()
.and_then(|keep_alive| keep_alive.parse::<usize>().ok())
.unwrap_or(75);
info!("Connecting to {}", conn_string);
let pool = match db::setup_connection_pool(&conn_string, pool_size) {
Ok(pool) => {
info!("Connected to postgres: {}", conn_string);
@ -55,17 +66,23 @@ fn main() {
}
};
let sys = actix::System::new("martin");
// let coordinator_system = actix::System::new("coordinator");
// let _addr: Addr<Syn, _> = coordinator::CoordinatorActor.start();
// coordinator_system.run();
let server = actix::System::new("server");
let db_sync_arbiter = SyncArbiter::start(3, move || db::DbExecutor(pool.clone()));
let port = 3000;
let bind_addr = format!("0.0.0.0:{}", port);
let _addr = HttpServer::new(move || martin::new(db_sync_arbiter.clone(), sources.clone()))
.bind(bind_addr.clone())
.expect(&format!("Can't bind to {}", bind_addr))
.expect(format!("Can't bind to {}", bind_addr))
.keep_alive(keep_alive)
.shutdown_timeout(0)
.threads(worker_processes)
.start();
let _ = sys.run();
let _ = server.run();
info!("Server has been started on {}.", bind_addr);
}