mirror of
https://github.com/maplibre/martin.git
synced 2024-12-18 20:31:54 +03:00
ci: refactor CI (#86)
* style: run fmt * ci: run release only on tags * ci: switch to actions/checkout@v2 * ci: add grcov * ci: update docker job
This commit is contained in:
parent
9887d2bb99
commit
1e34676a14
39
.github/workflows/ci.yml
vendored
39
.github/workflows/ci.yml
vendored
@ -1,6 +1,12 @@
|
||||
name: CI
|
||||
|
||||
on: [push]
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
pull_request:
|
||||
branches:
|
||||
- master
|
||||
|
||||
jobs:
|
||||
check:
|
||||
@ -8,7 +14,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout sources
|
||||
uses: actions/checkout@v1
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Install stable toolchain
|
||||
uses: actions-rs/toolchain@v1
|
||||
@ -26,7 +32,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout sources
|
||||
uses: actions/checkout@v1
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Install stable toolchain
|
||||
uses: actions-rs/toolchain@v1
|
||||
@ -48,7 +54,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout sources
|
||||
uses: actions/checkout@v1
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Install stable toolchain
|
||||
uses: actions-rs/toolchain@v1
|
||||
@ -61,7 +67,6 @@ jobs:
|
||||
|
||||
- name: Run cargo fmt
|
||||
uses: actions-rs/cargo@v1
|
||||
continue-on-error: true # WARNING: only for this example, remove it!
|
||||
with:
|
||||
command: fmt
|
||||
args: --all -- --check
|
||||
@ -82,7 +87,7 @@ jobs:
|
||||
options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5
|
||||
steps:
|
||||
- name: Checkout sources
|
||||
uses: actions/checkout@v1
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Setup database
|
||||
run: |
|
||||
@ -100,10 +105,11 @@ jobs:
|
||||
toolchain: stable
|
||||
override: true
|
||||
|
||||
- name: Run cargo test
|
||||
- name: Run tests
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: test
|
||||
args: --all
|
||||
env:
|
||||
DATABASE_URL: postgres://postgres@localhost:${{ job.services.postgres.ports[5432] }}/test
|
||||
|
||||
@ -127,14 +133,24 @@ jobs:
|
||||
./tests/vtzero-show function_source.pbf
|
||||
|
||||
docker:
|
||||
runs-on: ubuntu-latest
|
||||
if: startsWith(github.ref, 'refs/tags/')
|
||||
needs: [test]
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v1
|
||||
- name: Checkout sources
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Build the Docker image
|
||||
run: docker build -t docker.pkg.github.com/urbica/martin/martin:latest .
|
||||
uses: docker/build-push-action@v1
|
||||
with:
|
||||
username: ${{ secrets.DOCKER_USERNAME }}
|
||||
password: ${{ secrets.DOCKER_PASSWORD }}
|
||||
repository: urbica/martin
|
||||
tag_with_ref: true
|
||||
push: ${{ startsWith(github.ref, 'refs/tags/') }}
|
||||
|
||||
release:
|
||||
if: startsWith(github.ref, 'refs/tags/')
|
||||
needs: [test]
|
||||
strategy:
|
||||
matrix:
|
||||
@ -164,7 +180,7 @@ jobs:
|
||||
override: true
|
||||
target: ${{ matrix.target }}
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v1
|
||||
uses: actions/checkout@v2
|
||||
- name: Run build
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
@ -187,7 +203,6 @@ jobs:
|
||||
run: shasum -a 256 ${{ matrix.name }}
|
||||
- name: Publish
|
||||
uses: softprops/action-gh-release@v1
|
||||
if: startsWith(github.ref, 'refs/tags/')
|
||||
with:
|
||||
draft: true
|
||||
files: "martin*"
|
||||
|
60
.github/workflows/grcov.yml
vendored
Normal file
60
.github/workflows/grcov.yml
vendored
Normal file
@ -0,0 +1,60 @@
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
pull_request:
|
||||
branches:
|
||||
- master
|
||||
|
||||
name: Code coverage
|
||||
|
||||
jobs:
|
||||
grcov:
|
||||
runs-on: ubuntu-latest
|
||||
services:
|
||||
postgres:
|
||||
image: mdillon/postgis:11-alpine
|
||||
env:
|
||||
POSTGRES_DB: test
|
||||
POSTGRES_USER: postgres
|
||||
ports:
|
||||
- 5432/tcp
|
||||
options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5
|
||||
steps:
|
||||
- name: Checkout sources
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Setup database
|
||||
run: |
|
||||
sudo apt-get install postgresql-client
|
||||
psql -h $POSTGRES_HOST -p $POSTGRES_PORT -U postgres -d test -f tests/fixtures/TileBBox.sql
|
||||
psql -h $POSTGRES_HOST -p $POSTGRES_PORT -U postgres -d test -f tests/fixtures/table_source.sql
|
||||
psql -h $POSTGRES_HOST -p $POSTGRES_PORT -U postgres -d test -f tests/fixtures/function_source.sql
|
||||
env:
|
||||
POSTGRES_HOST: localhost
|
||||
POSTGRES_PORT: ${{ job.services.postgres.ports[5432] }}
|
||||
|
||||
- name: Install nightly toolchain
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
toolchain: nightly
|
||||
override: true
|
||||
|
||||
- name: Run tests
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: test
|
||||
args: --all
|
||||
env:
|
||||
CARGO_INCREMENTAL: 0
|
||||
RUSTFLAGS: '-Zprofile -Ccodegen-units=1 -Cinline-threshold=0 -Clink-dead-code -Coverflow-checks=off -Cpanic=abort -Zpanic_abort_tests'
|
||||
DATABASE_URL: postgres://postgres@localhost:${{ job.services.postgres.ports[5432] }}/test
|
||||
|
||||
- name: Gather coverage data
|
||||
id: coverage
|
||||
uses: actions-rs/grcov@v0.1
|
||||
|
||||
- name: Codecov upload
|
||||
uses: codecov/codecov-action@v1
|
||||
with:
|
||||
file: ${{ steps.coverage.outputs.report }}
|
@ -1,6 +1,8 @@
|
||||
# Martin
|
||||
|
||||
[![CI](https://github.com/urbica/martin/workflows/CI/badge.svg)](https://github.com/urbica/martin/actions)
|
||||
![Security audit](https://github.com/urbica/martin/workflows/Security%20audit/badge.svg)
|
||||
[![codecov](https://codecov.io/gh/urbica/martin/branch/master/graph/badge.svg)](https://codecov.io/gh/urbica/martin)
|
||||
[![Docker pulls](https://img.shields.io/docker/pulls/urbica/martin.svg)](https://hub.docker.com/r/urbica/martin)
|
||||
[![Metadata](https://images.microbadger.com/badges/image/urbica/martin.svg)](https://microbadger.com/images/urbica/martin)
|
||||
|
||||
|
@ -7,30 +7,30 @@ use martin::dev::{mock_function_sources, mock_state, mock_table_sources};
|
||||
use martin::server::router;
|
||||
|
||||
fn criterion_benchmark(c: &mut Criterion) {
|
||||
let state = test::run_on(|| mock_state(mock_table_sources(), mock_function_sources()));
|
||||
let mut app = test::init_service(App::new().app_data(state).configure(router));
|
||||
let state = test::run_on(|| mock_state(mock_table_sources(), mock_function_sources()));
|
||||
let mut app = test::init_service(App::new().app_data(state).configure(router));
|
||||
|
||||
c.bench_function("/public.table_source/0/0/0.pbf", |b| {
|
||||
b.iter(|| {
|
||||
let req = test::TestRequest::get()
|
||||
.uri("/public.table_source/0/0/0.pbf")
|
||||
.to_request();
|
||||
c.bench_function("/public.table_source/0/0/0.pbf", |b| {
|
||||
b.iter(|| {
|
||||
let req = test::TestRequest::get()
|
||||
.uri("/public.table_source/0/0/0.pbf")
|
||||
.to_request();
|
||||
|
||||
let future = test::run_on(|| app.call(req));
|
||||
let _response = test::block_on(future).unwrap();
|
||||
})
|
||||
});
|
||||
let future = test::run_on(|| app.call(req));
|
||||
let _response = test::block_on(future).unwrap();
|
||||
})
|
||||
});
|
||||
|
||||
c.bench_function("/rpc/public.function_source/0/0/0.pbf", |b| {
|
||||
b.iter(|| {
|
||||
let req = test::TestRequest::get()
|
||||
.uri("/rpc/public.function_source/0/0/0.pbf")
|
||||
.to_request();
|
||||
c.bench_function("/rpc/public.function_source/0/0/0.pbf", |b| {
|
||||
b.iter(|| {
|
||||
let req = test::TestRequest::get()
|
||||
.uri("/rpc/public.function_source/0/0/0.pbf")
|
||||
.to_request();
|
||||
|
||||
let future = test::run_on(|| app.call(req));
|
||||
let _response = test::block_on(future).unwrap();
|
||||
})
|
||||
});
|
||||
let future = test::run_on(|| app.call(req));
|
||||
let _response = test::block_on(future).unwrap();
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
criterion_group!(benches, criterion_benchmark);
|
||||
|
182
src/bin/main.rs
182
src/bin/main.rs
@ -6,7 +6,7 @@ use serde::Deserialize;
|
||||
use std::{env, io};
|
||||
|
||||
use martin::config::{read_config, Config, ConfigBuilder};
|
||||
use martin::db::{check_postgis_version, setup_connection_pool, get_connection, Pool};
|
||||
use martin::db::{check_postgis_version, get_connection, setup_connection_pool, Pool};
|
||||
use martin::function_source::get_function_sources;
|
||||
use martin::server;
|
||||
use martin::table_source::get_table_sources;
|
||||
@ -36,128 +36,124 @@ Options:
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct Args {
|
||||
pub arg_connection: Option<String>,
|
||||
pub flag_config: Option<String>,
|
||||
pub flag_help: bool,
|
||||
pub flag_keep_alive: Option<usize>,
|
||||
pub flag_listen_addresses: Option<String>,
|
||||
pub flag_pool_size: Option<u32>,
|
||||
pub flag_watch: bool,
|
||||
pub flag_version: bool,
|
||||
pub flag_workers: Option<usize>,
|
||||
pub arg_connection: Option<String>,
|
||||
pub flag_config: Option<String>,
|
||||
pub flag_help: bool,
|
||||
pub flag_keep_alive: Option<usize>,
|
||||
pub flag_listen_addresses: Option<String>,
|
||||
pub flag_pool_size: Option<u32>,
|
||||
pub flag_watch: bool,
|
||||
pub flag_version: bool,
|
||||
pub flag_workers: Option<usize>,
|
||||
}
|
||||
|
||||
pub fn generate_config(
|
||||
args: Args,
|
||||
connection_string: String,
|
||||
pool: &Pool,
|
||||
) -> io::Result<Config> {
|
||||
let mut connection = get_connection(pool)?;
|
||||
let table_sources = get_table_sources(&mut connection)?;
|
||||
let function_sources = get_function_sources(&mut connection)?;
|
||||
pub fn generate_config(args: Args, connection_string: String, pool: &Pool) -> io::Result<Config> {
|
||||
let mut connection = get_connection(pool)?;
|
||||
let table_sources = get_table_sources(&mut connection)?;
|
||||
let function_sources = get_function_sources(&mut connection)?;
|
||||
|
||||
let config = ConfigBuilder {
|
||||
connection_string,
|
||||
watch: Some(args.flag_watch),
|
||||
keep_alive: args.flag_keep_alive,
|
||||
listen_addresses: args.flag_listen_addresses,
|
||||
pool_size: args.flag_pool_size,
|
||||
worker_processes: args.flag_workers,
|
||||
table_sources: Some(table_sources),
|
||||
function_sources: Some(function_sources),
|
||||
};
|
||||
let config = ConfigBuilder {
|
||||
connection_string,
|
||||
watch: Some(args.flag_watch),
|
||||
keep_alive: args.flag_keep_alive,
|
||||
listen_addresses: args.flag_listen_addresses,
|
||||
pool_size: args.flag_pool_size,
|
||||
worker_processes: args.flag_workers,
|
||||
table_sources: Some(table_sources),
|
||||
function_sources: Some(function_sources),
|
||||
};
|
||||
|
||||
let config = config.finalize();
|
||||
Ok(config)
|
||||
let config = config.finalize();
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
fn setup_from_config(file_name: String) -> Result<(Config, Pool), std::io::Error> {
|
||||
let config = read_config(&file_name).map_err(prettify_error("Can't read config"))?;
|
||||
let config = read_config(&file_name).map_err(prettify_error("Can't read config"))?;
|
||||
|
||||
let pool = setup_connection_pool(&config.connection_string, Some(config.pool_size))
|
||||
.map_err(prettify_error("Can't setup connection pool"))?;
|
||||
let pool = setup_connection_pool(&config.connection_string, Some(config.pool_size))
|
||||
.map_err(prettify_error("Can't setup connection pool"))?;
|
||||
|
||||
info!("Connected to {}", config.connection_string);
|
||||
info!("Connected to {}", config.connection_string);
|
||||
|
||||
Ok((config, pool))
|
||||
Ok((config, pool))
|
||||
}
|
||||
|
||||
fn setup_from_database(args: Args) -> Result<(Config, Pool), std::io::Error> {
|
||||
let connection_string = if args.arg_connection.is_some() {
|
||||
args.arg_connection.clone().unwrap()
|
||||
} else {
|
||||
env::var("DATABASE_URL").map_err(prettify_error("DATABASE_URL is not set"))?
|
||||
};
|
||||
let connection_string = if args.arg_connection.is_some() {
|
||||
args.arg_connection.clone().unwrap()
|
||||
} else {
|
||||
env::var("DATABASE_URL").map_err(prettify_error("DATABASE_URL is not set"))?
|
||||
};
|
||||
|
||||
let pool = setup_connection_pool(&connection_string, args.flag_pool_size)
|
||||
.map_err(prettify_error("Can't setup connection pool"))?;
|
||||
let pool = setup_connection_pool(&connection_string, args.flag_pool_size)
|
||||
.map_err(prettify_error("Can't setup connection pool"))?;
|
||||
|
||||
info!("Connected to {}", connection_string);
|
||||
info!("Connected to {}", connection_string);
|
||||
|
||||
let config = generate_config(args, connection_string, &pool)
|
||||
.map_err(prettify_error("Can't generate config"))?;
|
||||
let config = generate_config(args, connection_string, &pool)
|
||||
.map_err(prettify_error("Can't generate config"))?;
|
||||
|
||||
Ok((config, pool))
|
||||
Ok((config, pool))
|
||||
}
|
||||
|
||||
fn start(args: Args) -> Result<actix::SystemRunner, std::io::Error> {
|
||||
info!("Starting martin v{}", VERSION);
|
||||
info!("Starting martin v{}", VERSION);
|
||||
|
||||
let (config, pool) = match args.flag_config {
|
||||
Some(config_file_name) => {
|
||||
info!("Using {}", config_file_name);
|
||||
setup_from_config(config_file_name)?
|
||||
let (config, pool) = match args.flag_config {
|
||||
Some(config_file_name) => {
|
||||
info!("Using {}", config_file_name);
|
||||
setup_from_config(config_file_name)?
|
||||
}
|
||||
None => {
|
||||
info!("Config is not set, scanning database");
|
||||
setup_from_database(args)?
|
||||
}
|
||||
};
|
||||
|
||||
let matches = check_postgis_version(REQUIRED_POSTGIS_VERSION, &pool)
|
||||
.map_err(prettify_error("Can't check PostGIS version"))?;
|
||||
|
||||
if !matches {
|
||||
std::process::exit(-1);
|
||||
}
|
||||
None => {
|
||||
info!("Config is not set, scanning database");
|
||||
setup_from_database(args)?
|
||||
|
||||
let watch_mode = config.watch || env::var_os("WATCH_MODE").is_some();
|
||||
if watch_mode {
|
||||
info!("Watch mode enabled");
|
||||
}
|
||||
};
|
||||
|
||||
let matches = check_postgis_version(REQUIRED_POSTGIS_VERSION, &pool)
|
||||
.map_err(prettify_error("Can't check PostGIS version"))?;
|
||||
let listen_addresses = config.listen_addresses.clone();
|
||||
let server = server::new(pool, config, watch_mode);
|
||||
info!("Martin has been started on {}.", listen_addresses);
|
||||
|
||||
if !matches {
|
||||
std::process::exit(-1);
|
||||
}
|
||||
|
||||
let watch_mode = config.watch || env::var_os("WATCH_MODE").is_some();
|
||||
if watch_mode {
|
||||
info!("Watch mode enabled");
|
||||
}
|
||||
|
||||
let listen_addresses = config.listen_addresses.clone();
|
||||
let server = server::new(pool, config, watch_mode);
|
||||
info!("Martin has been started on {}.", listen_addresses);
|
||||
|
||||
Ok(server)
|
||||
Ok(server)
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let env = env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "martin=info");
|
||||
env_logger::Builder::from_env(env).init();
|
||||
let env = env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "martin=info");
|
||||
env_logger::Builder::from_env(env).init();
|
||||
|
||||
let args: Args = Docopt::new(USAGE)
|
||||
.and_then(|d| d.deserialize())
|
||||
.unwrap_or_else(|e| e.exit());
|
||||
let args: Args = Docopt::new(USAGE)
|
||||
.and_then(|d| d.deserialize())
|
||||
.unwrap_or_else(|e| e.exit());
|
||||
|
||||
if args.flag_help {
|
||||
println!("{}", USAGE);
|
||||
std::process::exit(0);
|
||||
}
|
||||
|
||||
if args.flag_version {
|
||||
println!("v{}", VERSION);
|
||||
std::process::exit(0);
|
||||
}
|
||||
|
||||
let server = match start(args) {
|
||||
Ok(server) => server,
|
||||
Err(error) => {
|
||||
error!("{}", error);
|
||||
std::process::exit(-1);
|
||||
if args.flag_help {
|
||||
println!("{}", USAGE);
|
||||
std::process::exit(0);
|
||||
}
|
||||
};
|
||||
|
||||
let _ = server.run();
|
||||
if args.flag_version {
|
||||
println!("v{}", VERSION);
|
||||
std::process::exit(0);
|
||||
}
|
||||
|
||||
let server = match start(args) {
|
||||
Ok(server) => server,
|
||||
Err(error) => {
|
||||
error!("{}", error);
|
||||
std::process::exit(-1);
|
||||
}
|
||||
};
|
||||
|
||||
let _ = server.run();
|
||||
}
|
||||
|
@ -8,52 +8,52 @@ use crate::table_source::TableSources;
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
pub struct Config {
|
||||
pub watch: bool,
|
||||
pub pool_size: u32,
|
||||
pub keep_alive: usize,
|
||||
pub worker_processes: usize,
|
||||
pub listen_addresses: String,
|
||||
pub connection_string: String,
|
||||
pub table_sources: Option<TableSources>,
|
||||
pub function_sources: Option<FunctionSources>,
|
||||
pub watch: bool,
|
||||
pub pool_size: u32,
|
||||
pub keep_alive: usize,
|
||||
pub worker_processes: usize,
|
||||
pub listen_addresses: String,
|
||||
pub connection_string: String,
|
||||
pub table_sources: Option<TableSources>,
|
||||
pub function_sources: Option<FunctionSources>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct ConfigBuilder {
|
||||
pub watch: Option<bool>,
|
||||
pub pool_size: Option<u32>,
|
||||
pub keep_alive: Option<usize>,
|
||||
pub worker_processes: Option<usize>,
|
||||
pub listen_addresses: Option<String>,
|
||||
pub connection_string: String,
|
||||
pub table_sources: Option<TableSources>,
|
||||
pub function_sources: Option<FunctionSources>,
|
||||
pub watch: Option<bool>,
|
||||
pub pool_size: Option<u32>,
|
||||
pub keep_alive: Option<usize>,
|
||||
pub worker_processes: Option<usize>,
|
||||
pub listen_addresses: Option<String>,
|
||||
pub connection_string: String,
|
||||
pub table_sources: Option<TableSources>,
|
||||
pub function_sources: Option<FunctionSources>,
|
||||
}
|
||||
|
||||
impl ConfigBuilder {
|
||||
pub fn finalize(self) -> Config {
|
||||
Config {
|
||||
watch: self.watch.unwrap_or(false),
|
||||
pool_size: self.pool_size.unwrap_or(20),
|
||||
keep_alive: self.keep_alive.unwrap_or(75),
|
||||
worker_processes: self.worker_processes.unwrap_or_else(num_cpus::get),
|
||||
listen_addresses: self
|
||||
.listen_addresses
|
||||
.unwrap_or_else(|| "0.0.0.0:3000".to_owned()),
|
||||
connection_string: self.connection_string,
|
||||
table_sources: self.table_sources,
|
||||
function_sources: self.function_sources,
|
||||
pub fn finalize(self) -> Config {
|
||||
Config {
|
||||
watch: self.watch.unwrap_or(false),
|
||||
pool_size: self.pool_size.unwrap_or(20),
|
||||
keep_alive: self.keep_alive.unwrap_or(75),
|
||||
worker_processes: self.worker_processes.unwrap_or_else(num_cpus::get),
|
||||
listen_addresses: self
|
||||
.listen_addresses
|
||||
.unwrap_or_else(|| "0.0.0.0:3000".to_owned()),
|
||||
connection_string: self.connection_string,
|
||||
table_sources: self.table_sources,
|
||||
function_sources: self.function_sources,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn read_config(file_name: &str) -> io::Result<Config> {
|
||||
let mut file = File::open(file_name)?;
|
||||
let mut contents = String::new();
|
||||
file.read_to_string(&mut contents)?;
|
||||
let mut file = File::open(file_name)?;
|
||||
let mut contents = String::new();
|
||||
file.read_to_string(&mut contents)?;
|
||||
|
||||
let config_builder: ConfigBuilder = serde_yaml::from_str(contents.as_str())
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
|
||||
let config_builder: ConfigBuilder = serde_yaml::from_str(contents.as_str())
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
|
||||
|
||||
Ok(config_builder.finalize())
|
||||
Ok(config_builder.finalize())
|
||||
}
|
||||
|
@ -4,54 +4,58 @@ use crate::messages;
|
||||
use crate::worker_actor::WorkerActor;
|
||||
|
||||
pub struct CoordinatorActor {
|
||||
workers: Vec<Addr<WorkerActor>>,
|
||||
workers: Vec<Addr<WorkerActor>>,
|
||||
}
|
||||
|
||||
impl Default for CoordinatorActor {
|
||||
fn default() -> CoordinatorActor {
|
||||
CoordinatorActor { workers: vec![] }
|
||||
}
|
||||
fn default() -> CoordinatorActor {
|
||||
CoordinatorActor { workers: vec![] }
|
||||
}
|
||||
}
|
||||
|
||||
impl Actor for CoordinatorActor {
|
||||
type Context = Context<Self>;
|
||||
type Context = Context<Self>;
|
||||
}
|
||||
|
||||
impl Handler<messages::Connect> for CoordinatorActor {
|
||||
type Result = Addr<WorkerActor>;
|
||||
type Result = Addr<WorkerActor>;
|
||||
|
||||
fn handle(&mut self, msg: messages::Connect, _: &mut Context<Self>) -> Self::Result {
|
||||
self.workers.push(msg.addr.clone());
|
||||
msg.addr
|
||||
}
|
||||
fn handle(&mut self, msg: messages::Connect, _: &mut Context<Self>) -> Self::Result {
|
||||
self.workers.push(msg.addr.clone());
|
||||
msg.addr
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<messages::RefreshTableSources> for CoordinatorActor {
|
||||
type Result = ();
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: messages::RefreshTableSources, _: &mut Context<Self>) -> Self::Result {
|
||||
for worker in &self.workers {
|
||||
let message = messages::RefreshTableSources {
|
||||
table_sources: msg.table_sources.clone(),
|
||||
};
|
||||
worker.do_send(message);
|
||||
fn handle(
|
||||
&mut self,
|
||||
msg: messages::RefreshTableSources,
|
||||
_: &mut Context<Self>,
|
||||
) -> Self::Result {
|
||||
for worker in &self.workers {
|
||||
let message = messages::RefreshTableSources {
|
||||
table_sources: msg.table_sources.clone(),
|
||||
};
|
||||
worker.do_send(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<messages::RefreshFunctionSources> for CoordinatorActor {
|
||||
type Result = ();
|
||||
type Result = ();
|
||||
|
||||
fn handle(
|
||||
&mut self,
|
||||
msg: messages::RefreshFunctionSources,
|
||||
_: &mut Context<Self>,
|
||||
) -> Self::Result {
|
||||
for worker in &self.workers {
|
||||
let message = messages::RefreshFunctionSources {
|
||||
function_sources: msg.function_sources.clone(),
|
||||
};
|
||||
worker.do_send(message);
|
||||
fn handle(
|
||||
&mut self,
|
||||
msg: messages::RefreshFunctionSources,
|
||||
_: &mut Context<Self>,
|
||||
) -> Self::Result {
|
||||
for worker in &self.workers {
|
||||
let message = messages::RefreshFunctionSources {
|
||||
function_sources: msg.function_sources.clone(),
|
||||
};
|
||||
worker.do_send(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
67
src/db.rs
67
src/db.rs
@ -10,58 +10,55 @@ pub type Pool = r2d2::Pool<PostgresConnectionManager<NoTls>>;
|
||||
pub type Connection = r2d2::PooledConnection<PostgresConnectionManager<NoTls>>;
|
||||
|
||||
pub fn setup_connection_pool(cn_str: &str, pool_size: Option<u32>) -> io::Result<Pool> {
|
||||
let config = postgres::config::Config::from_str(cn_str)
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
|
||||
let config = postgres::config::Config::from_str(cn_str)
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
|
||||
|
||||
let manager = PostgresConnectionManager::new(config, NoTls);
|
||||
let manager = PostgresConnectionManager::new(config, NoTls);
|
||||
|
||||
let pool = r2d2::Pool::builder()
|
||||
.max_size(pool_size.unwrap_or(20))
|
||||
.build(manager)
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
|
||||
let pool = r2d2::Pool::builder()
|
||||
.max_size(pool_size.unwrap_or(20))
|
||||
.build(manager)
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
|
||||
|
||||
Ok(pool)
|
||||
Ok(pool)
|
||||
}
|
||||
|
||||
pub fn get_connection(pool: &Pool) -> io::Result<Connection> {
|
||||
let connection = pool
|
||||
.get()
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
|
||||
let connection = pool
|
||||
.get()
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
|
||||
|
||||
Ok(connection)
|
||||
Ok(connection)
|
||||
}
|
||||
|
||||
pub fn select_postgis_verion(pool: &Pool) -> io::Result<String> {
|
||||
let mut connection = get_connection(pool)?;
|
||||
let mut connection = get_connection(pool)?;
|
||||
|
||||
let version = connection
|
||||
.query_one(include_str!("scripts/get_postgis_version.sql"), &[])
|
||||
.map(|row| row.get::<_, String>("postgis_version"))
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
|
||||
let version = connection
|
||||
.query_one(include_str!("scripts/get_postgis_version.sql"), &[])
|
||||
.map(|row| row.get::<_, String>("postgis_version"))
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
|
||||
|
||||
Ok(version)
|
||||
Ok(version)
|
||||
}
|
||||
|
||||
pub fn check_postgis_version(
|
||||
required_postgis_version: &str,
|
||||
pool: &Pool,
|
||||
) -> io::Result<bool> {
|
||||
let postgis_version = select_postgis_verion(&pool)?;
|
||||
pub fn check_postgis_version(required_postgis_version: &str, pool: &Pool) -> io::Result<bool> {
|
||||
let postgis_version = select_postgis_verion(&pool)?;
|
||||
|
||||
let req = VersionReq::parse(required_postgis_version)
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
|
||||
let req = VersionReq::parse(required_postgis_version)
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
|
||||
|
||||
let version = Version::parse(postgis_version.as_str())
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
|
||||
let version = Version::parse(postgis_version.as_str())
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
|
||||
|
||||
let matches = req.matches(&version);
|
||||
let matches = req.matches(&version);
|
||||
|
||||
if !matches {
|
||||
error!(
|
||||
"Martin requires PostGIS {}, current version is {}",
|
||||
required_postgis_version, postgis_version
|
||||
);
|
||||
}
|
||||
if !matches {
|
||||
error!(
|
||||
"Martin requires PostGIS {}, current version is {}",
|
||||
required_postgis_version, postgis_version
|
||||
);
|
||||
}
|
||||
|
||||
Ok(matches)
|
||||
Ok(matches)
|
||||
}
|
||||
|
@ -10,36 +10,40 @@ use crate::table_source::{get_table_sources, TableSources};
|
||||
pub struct DBActor(pub Pool);
|
||||
|
||||
impl Actor for DBActor {
|
||||
type Context = SyncContext<Self>;
|
||||
type Context = SyncContext<Self>;
|
||||
}
|
||||
|
||||
impl Handler<messages::GetTableSources> for DBActor {
|
||||
type Result = Result<TableSources, io::Error>;
|
||||
type Result = Result<TableSources, io::Error>;
|
||||
|
||||
fn handle(&mut self, _msg: messages::GetTableSources, _: &mut Self::Context) -> Self::Result {
|
||||
let mut connection = get_connection(&self.0)?;
|
||||
let table_sources = get_table_sources(&mut connection)?;
|
||||
Ok(table_sources)
|
||||
}
|
||||
fn handle(&mut self, _msg: messages::GetTableSources, _: &mut Self::Context) -> Self::Result {
|
||||
let mut connection = get_connection(&self.0)?;
|
||||
let table_sources = get_table_sources(&mut connection)?;
|
||||
Ok(table_sources)
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<messages::GetFunctionSources> for DBActor {
|
||||
type Result = Result<FunctionSources, io::Error>;
|
||||
type Result = Result<FunctionSources, io::Error>;
|
||||
|
||||
fn handle(&mut self, _msg: messages::GetFunctionSources, _: &mut Self::Context) -> Self::Result {
|
||||
let mut connection = get_connection(&self.0)?;
|
||||
let function_sources = get_function_sources(&mut connection)?;
|
||||
Ok(function_sources)
|
||||
}
|
||||
fn handle(
|
||||
&mut self,
|
||||
_msg: messages::GetFunctionSources,
|
||||
_: &mut Self::Context,
|
||||
) -> Self::Result {
|
||||
let mut connection = get_connection(&self.0)?;
|
||||
let function_sources = get_function_sources(&mut connection)?;
|
||||
Ok(function_sources)
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<messages::GetTile> for DBActor {
|
||||
type Result = Result<Tile, io::Error>;
|
||||
type Result = Result<Tile, io::Error>;
|
||||
|
||||
fn handle(&mut self, msg: messages::GetTile, _: &mut Self::Context) -> Self::Result {
|
||||
let mut connection = get_connection(&self.0)?;
|
||||
let tile = msg.source.get_tile(&mut connection, &msg.xyz, &msg.query)?;
|
||||
fn handle(&mut self, msg: messages::GetTile, _: &mut Self::Context) -> Self::Result {
|
||||
let mut connection = get_connection(&self.0)?;
|
||||
let tile = msg.source.get_tile(&mut connection, &msg.xyz, &msg.query)?;
|
||||
|
||||
Ok(tile)
|
||||
}
|
||||
Ok(tile)
|
||||
}
|
||||
}
|
||||
|
86
src/dev.rs
86
src/dev.rs
@ -13,60 +13,60 @@ use crate::server::AppState;
|
||||
use crate::table_source::{TableSource, TableSources};
|
||||
|
||||
pub fn mock_table_sources() -> Option<TableSources> {
|
||||
let id = "public.table_source";
|
||||
let source = TableSource {
|
||||
id: id.to_owned(),
|
||||
schema: "public".to_owned(),
|
||||
table: "table_source".to_owned(),
|
||||
id_column: None,
|
||||
geometry_column: "geom".to_owned(),
|
||||
srid: 3857,
|
||||
extent: Some(4096),
|
||||
buffer: Some(64),
|
||||
clip_geom: Some(true),
|
||||
geometry_type: None,
|
||||
properties: HashMap::new(),
|
||||
};
|
||||
let id = "public.table_source";
|
||||
let source = TableSource {
|
||||
id: id.to_owned(),
|
||||
schema: "public".to_owned(),
|
||||
table: "table_source".to_owned(),
|
||||
id_column: None,
|
||||
geometry_column: "geom".to_owned(),
|
||||
srid: 3857,
|
||||
extent: Some(4096),
|
||||
buffer: Some(64),
|
||||
clip_geom: Some(true),
|
||||
geometry_type: None,
|
||||
properties: HashMap::new(),
|
||||
};
|
||||
|
||||
let mut table_sources: TableSources = HashMap::new();
|
||||
table_sources.insert(id.to_owned(), Box::new(source));
|
||||
Some(table_sources)
|
||||
let mut table_sources: TableSources = HashMap::new();
|
||||
table_sources.insert(id.to_owned(), Box::new(source));
|
||||
Some(table_sources)
|
||||
}
|
||||
|
||||
pub fn mock_function_sources() -> Option<FunctionSources> {
|
||||
let id = "public.function_source";
|
||||
let source = FunctionSource {
|
||||
id: id.to_owned(),
|
||||
schema: "public".to_owned(),
|
||||
function: "function_source".to_owned(),
|
||||
};
|
||||
let id = "public.function_source";
|
||||
let source = FunctionSource {
|
||||
id: id.to_owned(),
|
||||
schema: "public".to_owned(),
|
||||
function: "function_source".to_owned(),
|
||||
};
|
||||
|
||||
let mut function_sources: FunctionSources = HashMap::new();
|
||||
function_sources.insert(id.to_owned(), Box::new(source));
|
||||
Some(function_sources)
|
||||
let mut function_sources: FunctionSources = HashMap::new();
|
||||
function_sources.insert(id.to_owned(), Box::new(source));
|
||||
Some(function_sources)
|
||||
}
|
||||
|
||||
pub fn mock_state(
|
||||
table_sources: Option<TableSources>,
|
||||
function_sources: Option<FunctionSources>,
|
||||
table_sources: Option<TableSources>,
|
||||
function_sources: Option<FunctionSources>,
|
||||
) -> AppState {
|
||||
let connection_string: String = env::var("DATABASE_URL").unwrap();
|
||||
info!("Connecting to {}", connection_string);
|
||||
let connection_string: String = env::var("DATABASE_URL").unwrap();
|
||||
info!("Connecting to {}", connection_string);
|
||||
|
||||
let pool = setup_connection_pool(&connection_string, Some(1)).unwrap();
|
||||
info!("Connected to {}", connection_string);
|
||||
let pool = setup_connection_pool(&connection_string, Some(1)).unwrap();
|
||||
info!("Connected to {}", connection_string);
|
||||
|
||||
let db = SyncArbiter::start(3, move || DBActor(pool.clone()));
|
||||
let coordinator: Addr<_> = CoordinatorActor::default().start();
|
||||
let db = SyncArbiter::start(3, move || DBActor(pool.clone()));
|
||||
let coordinator: Addr<_> = CoordinatorActor::default().start();
|
||||
|
||||
let table_sources = Rc::new(RefCell::new(table_sources));
|
||||
let function_sources = Rc::new(RefCell::new(function_sources));
|
||||
let table_sources = Rc::new(RefCell::new(table_sources));
|
||||
let function_sources = Rc::new(RefCell::new(function_sources));
|
||||
|
||||
AppState {
|
||||
db,
|
||||
coordinator,
|
||||
table_sources,
|
||||
function_sources,
|
||||
watch_mode: false,
|
||||
}
|
||||
AppState {
|
||||
db,
|
||||
coordinator,
|
||||
table_sources,
|
||||
function_sources,
|
||||
watch_mode: false,
|
||||
}
|
||||
}
|
||||
|
@ -9,81 +9,81 @@ use crate::utils::query_to_json_string;
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct FunctionSource {
|
||||
pub id: String,
|
||||
pub schema: String,
|
||||
pub function: String,
|
||||
pub id: String,
|
||||
pub schema: String,
|
||||
pub function: String,
|
||||
}
|
||||
|
||||
pub type FunctionSources = HashMap<String, Box<FunctionSource>>;
|
||||
|
||||
impl Source for FunctionSource {
|
||||
fn get_id(&self) -> &str {
|
||||
self.id.as_str()
|
||||
}
|
||||
fn get_id(&self) -> &str {
|
||||
self.id.as_str()
|
||||
}
|
||||
|
||||
fn get_tilejson(&self) -> Result<TileJSON, io::Error> {
|
||||
let mut tilejson_builder = TileJSONBuilder::new();
|
||||
fn get_tilejson(&self) -> Result<TileJSON, io::Error> {
|
||||
let mut tilejson_builder = TileJSONBuilder::new();
|
||||
|
||||
tilejson_builder.scheme("xyz");
|
||||
tilejson_builder.name(&self.id);
|
||||
tilejson_builder.tiles(vec![]);
|
||||
tilejson_builder.scheme("xyz");
|
||||
tilejson_builder.name(&self.id);
|
||||
tilejson_builder.tiles(vec![]);
|
||||
|
||||
Ok(tilejson_builder.finalize())
|
||||
}
|
||||
Ok(tilejson_builder.finalize())
|
||||
}
|
||||
|
||||
fn get_tile(
|
||||
&self,
|
||||
conn: &mut Connection,
|
||||
xyz: &XYZ,
|
||||
query: &Option<Query>,
|
||||
) -> Result<Tile, io::Error> {
|
||||
let empty_query = HashMap::new();
|
||||
let query = query.as_ref().unwrap_or(&empty_query);
|
||||
fn get_tile(
|
||||
&self,
|
||||
conn: &mut Connection,
|
||||
xyz: &XYZ,
|
||||
query: &Option<Query>,
|
||||
) -> Result<Tile, io::Error> {
|
||||
let empty_query = HashMap::new();
|
||||
let query = query.as_ref().unwrap_or(&empty_query);
|
||||
|
||||
let query_json_string =
|
||||
query_to_json_string(&query).map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
|
||||
let query_json_string = query_to_json_string(&query)
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
|
||||
|
||||
let query = format!(
|
||||
include_str!("scripts/call_rpc.sql"),
|
||||
schema = self.schema,
|
||||
function = self.function,
|
||||
z = xyz.z,
|
||||
x = xyz.x,
|
||||
y = xyz.y,
|
||||
query_params = query_json_string
|
||||
);
|
||||
let query = format!(
|
||||
include_str!("scripts/call_rpc.sql"),
|
||||
schema = self.schema,
|
||||
function = self.function,
|
||||
z = xyz.z,
|
||||
x = xyz.x,
|
||||
y = xyz.y,
|
||||
query_params = query_json_string
|
||||
);
|
||||
|
||||
let tile: Tile = conn
|
||||
.query_one(query.as_str(), &[])
|
||||
.map(|row| row.get(self.function.as_str()))
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
|
||||
let tile: Tile = conn
|
||||
.query_one(query.as_str(), &[])
|
||||
.map(|row| row.get(self.function.as_str()))
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
|
||||
|
||||
Ok(tile)
|
||||
}
|
||||
Ok(tile)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_function_sources(conn: &mut Connection) -> Result<FunctionSources, io::Error> {
|
||||
let mut sources = HashMap::new();
|
||||
let mut sources = HashMap::new();
|
||||
|
||||
let rows = conn
|
||||
.query(include_str!("scripts/get_function_sources.sql"), &[])
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
|
||||
let rows = conn
|
||||
.query(include_str!("scripts/get_function_sources.sql"), &[])
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
|
||||
|
||||
for row in &rows {
|
||||
let schema: String = row.get("specific_schema");
|
||||
let function: String = row.get("routine_name");
|
||||
let id = format!("{}.{}", schema, function);
|
||||
for row in &rows {
|
||||
let schema: String = row.get("specific_schema");
|
||||
let function: String = row.get("routine_name");
|
||||
let id = format!("{}.{}", schema, function);
|
||||
|
||||
info!("Found {} function source", id);
|
||||
info!("Found {} function source", id);
|
||||
|
||||
let source = FunctionSource {
|
||||
id: id.clone(),
|
||||
schema,
|
||||
function,
|
||||
};
|
||||
let source = FunctionSource {
|
||||
id: id.clone(),
|
||||
schema,
|
||||
function,
|
||||
};
|
||||
|
||||
sources.insert(id, Box::new(source));
|
||||
}
|
||||
sources.insert(id, Box::new(source));
|
||||
}
|
||||
|
||||
Ok(sources)
|
||||
Ok(sources)
|
||||
}
|
||||
|
@ -7,45 +7,45 @@ use crate::table_source::TableSources;
|
||||
use crate::worker_actor::WorkerActor;
|
||||
|
||||
pub struct Connect {
|
||||
pub addr: Addr<WorkerActor>,
|
||||
pub addr: Addr<WorkerActor>,
|
||||
}
|
||||
|
||||
impl Message for Connect {
|
||||
type Result = Addr<WorkerActor>;
|
||||
type Result = Addr<WorkerActor>;
|
||||
}
|
||||
|
||||
pub struct GetTile {
|
||||
pub xyz: XYZ,
|
||||
pub query: Option<Query>,
|
||||
pub source: Box<dyn Source + Send>,
|
||||
pub xyz: XYZ,
|
||||
pub query: Option<Query>,
|
||||
pub source: Box<dyn Source + Send>,
|
||||
}
|
||||
|
||||
impl Message for GetTile {
|
||||
type Result = Result<Tile, io::Error>;
|
||||
type Result = Result<Tile, io::Error>;
|
||||
}
|
||||
|
||||
pub struct GetTableSources {}
|
||||
impl Message for GetTableSources {
|
||||
type Result = Result<TableSources, io::Error>;
|
||||
type Result = Result<TableSources, io::Error>;
|
||||
}
|
||||
|
||||
pub struct GetFunctionSources {}
|
||||
impl Message for GetFunctionSources {
|
||||
type Result = Result<FunctionSources, io::Error>;
|
||||
type Result = Result<FunctionSources, io::Error>;
|
||||
}
|
||||
|
||||
pub struct RefreshTableSources {
|
||||
pub table_sources: Option<TableSources>,
|
||||
pub table_sources: Option<TableSources>,
|
||||
}
|
||||
|
||||
impl Message for RefreshTableSources {
|
||||
type Result = ();
|
||||
type Result = ();
|
||||
}
|
||||
|
||||
pub struct RefreshFunctionSources {
|
||||
pub function_sources: Option<FunctionSources>,
|
||||
pub function_sources: Option<FunctionSources>,
|
||||
}
|
||||
|
||||
impl Message for RefreshFunctionSources {
|
||||
type Result = ();
|
||||
type Result = ();
|
||||
}
|
||||
|
@ -11,17 +11,22 @@ pub type Query = HashMap<String, String>;
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
pub struct XYZ {
|
||||
pub z: u32,
|
||||
pub x: u32,
|
||||
pub y: u32,
|
||||
pub z: u32,
|
||||
pub x: u32,
|
||||
pub y: u32,
|
||||
}
|
||||
|
||||
pub trait Source: Debug {
|
||||
fn get_id(&self) -> &str;
|
||||
fn get_id(&self) -> &str;
|
||||
|
||||
fn get_tilejson(&self) -> Result<TileJSON, io::Error>;
|
||||
fn get_tilejson(&self) -> Result<TileJSON, io::Error>;
|
||||
|
||||
fn get_tile(&self, conn: &mut Connection, xyz: &XYZ, query: &Option<Query>) -> Result<Tile, io::Error>;
|
||||
fn get_tile(
|
||||
&self,
|
||||
conn: &mut Connection,
|
||||
xyz: &XYZ,
|
||||
query: &Option<Query>,
|
||||
) -> Result<Tile, io::Error>;
|
||||
}
|
||||
|
||||
// pub type Sources = HashMap<String, Box<dyn Source>>;
|
||||
|
@ -10,91 +10,91 @@ use crate::utils;
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct TableSource {
|
||||
pub id: String,
|
||||
pub schema: String,
|
||||
pub table: String,
|
||||
pub id_column: Option<String>,
|
||||
pub geometry_column: String,
|
||||
pub srid: u32,
|
||||
pub extent: Option<u32>,
|
||||
pub buffer: Option<u32>,
|
||||
pub clip_geom: Option<bool>,
|
||||
pub geometry_type: Option<String>,
|
||||
pub properties: HashMap<String, String>,
|
||||
pub id: String,
|
||||
pub schema: String,
|
||||
pub table: String,
|
||||
pub id_column: Option<String>,
|
||||
pub geometry_column: String,
|
||||
pub srid: u32,
|
||||
pub extent: Option<u32>,
|
||||
pub buffer: Option<u32>,
|
||||
pub clip_geom: Option<bool>,
|
||||
pub geometry_type: Option<String>,
|
||||
pub properties: HashMap<String, String>,
|
||||
}
|
||||
|
||||
pub type TableSources = HashMap<String, Box<TableSource>>;
|
||||
|
||||
impl Source for TableSource {
|
||||
fn get_id(&self) -> &str {
|
||||
self.id.as_str()
|
||||
}
|
||||
fn get_id(&self) -> &str {
|
||||
self.id.as_str()
|
||||
}
|
||||
|
||||
fn get_tilejson(&self) -> Result<TileJSON, io::Error> {
|
||||
let mut tilejson_builder = TileJSONBuilder::new();
|
||||
fn get_tilejson(&self) -> Result<TileJSON, io::Error> {
|
||||
let mut tilejson_builder = TileJSONBuilder::new();
|
||||
|
||||
tilejson_builder.scheme("xyz");
|
||||
tilejson_builder.name(&self.id);
|
||||
tilejson_builder.scheme("xyz");
|
||||
tilejson_builder.name(&self.id);
|
||||
|
||||
Ok(tilejson_builder.finalize())
|
||||
}
|
||||
Ok(tilejson_builder.finalize())
|
||||
}
|
||||
|
||||
fn get_tile(
|
||||
&self,
|
||||
conn: &mut Connection,
|
||||
xyz: &XYZ,
|
||||
_query: &Option<Query>,
|
||||
) -> Result<Tile, io::Error> {
|
||||
let mercator_bounds = utils::tilebbox(xyz);
|
||||
fn get_tile(
|
||||
&self,
|
||||
conn: &mut Connection,
|
||||
xyz: &XYZ,
|
||||
_query: &Option<Query>,
|
||||
) -> Result<Tile, io::Error> {
|
||||
let mercator_bounds = utils::tilebbox(xyz);
|
||||
|
||||
let (geometry_column_mercator, original_bounds) = if self.srid == 3857 {
|
||||
(self.geometry_column.clone(), mercator_bounds.clone())
|
||||
} else {
|
||||
(
|
||||
format!("ST_Transform({0}, 3857)", self.geometry_column),
|
||||
format!("ST_Transform({0}, {1})", mercator_bounds, self.srid),
|
||||
)
|
||||
};
|
||||
let (geometry_column_mercator, original_bounds) = if self.srid == 3857 {
|
||||
(self.geometry_column.clone(), mercator_bounds.clone())
|
||||
} else {
|
||||
(
|
||||
format!("ST_Transform({0}, 3857)", self.geometry_column),
|
||||
format!("ST_Transform({0}, {1})", mercator_bounds, self.srid),
|
||||
)
|
||||
};
|
||||
|
||||
let properties = if self.properties.is_empty() {
|
||||
"".to_string()
|
||||
} else {
|
||||
let properties = self
|
||||
.properties
|
||||
.keys()
|
||||
.map(|column| format!("\"{0}\"", column))
|
||||
.collect::<Vec<String>>()
|
||||
.join(",");
|
||||
let properties = if self.properties.is_empty() {
|
||||
"".to_string()
|
||||
} else {
|
||||
let properties = self
|
||||
.properties
|
||||
.keys()
|
||||
.map(|column| format!("\"{0}\"", column))
|
||||
.collect::<Vec<String>>()
|
||||
.join(",");
|
||||
|
||||
format!(", {0}", properties)
|
||||
};
|
||||
format!(", {0}", properties)
|
||||
};
|
||||
|
||||
let id_column = self
|
||||
.id_column
|
||||
.clone()
|
||||
.map_or("".to_string(), |id_column| format!(", '{}'", id_column));
|
||||
let id_column = self
|
||||
.id_column
|
||||
.clone()
|
||||
.map_or("".to_string(), |id_column| format!(", '{}'", id_column));
|
||||
|
||||
let query = format!(
|
||||
include_str!("scripts/get_tile.sql"),
|
||||
id = self.id,
|
||||
id_column = id_column,
|
||||
geometry_column = self.geometry_column,
|
||||
geometry_column_mercator = geometry_column_mercator,
|
||||
original_bounds = original_bounds,
|
||||
mercator_bounds = mercator_bounds,
|
||||
extent = self.extent.unwrap_or(DEFAULT_EXTENT),
|
||||
buffer = self.buffer.unwrap_or(DEFAULT_BUFFER),
|
||||
clip_geom = self.clip_geom.unwrap_or(DEFAULT_CLIP_GEOM),
|
||||
properties = properties
|
||||
);
|
||||
let query = format!(
|
||||
include_str!("scripts/get_tile.sql"),
|
||||
id = self.id,
|
||||
id_column = id_column,
|
||||
geometry_column = self.geometry_column,
|
||||
geometry_column_mercator = geometry_column_mercator,
|
||||
original_bounds = original_bounds,
|
||||
mercator_bounds = mercator_bounds,
|
||||
extent = self.extent.unwrap_or(DEFAULT_EXTENT),
|
||||
buffer = self.buffer.unwrap_or(DEFAULT_BUFFER),
|
||||
clip_geom = self.clip_geom.unwrap_or(DEFAULT_CLIP_GEOM),
|
||||
properties = properties
|
||||
);
|
||||
|
||||
let tile: Tile = conn
|
||||
.query_one(query.as_str(), &[])
|
||||
.map(|row| row.get("st_asmvt"))
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
|
||||
let tile: Tile = conn
|
||||
.query_one(query.as_str(), &[])
|
||||
.map(|row| row.get("st_asmvt"))
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
|
||||
|
||||
Ok(tile)
|
||||
}
|
||||
Ok(tile)
|
||||
}
|
||||
}
|
||||
|
||||
static DEFAULT_EXTENT: u32 = 4096;
|
||||
@ -102,45 +102,45 @@ static DEFAULT_BUFFER: u32 = 64;
|
||||
static DEFAULT_CLIP_GEOM: bool = true;
|
||||
|
||||
pub fn get_table_sources(conn: &mut Connection) -> Result<TableSources, io::Error> {
|
||||
let mut sources = HashMap::new();
|
||||
let mut sources = HashMap::new();
|
||||
|
||||
let rows = conn
|
||||
.query(include_str!("scripts/get_table_sources.sql"), &[])
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
|
||||
let rows = conn
|
||||
.query(include_str!("scripts/get_table_sources.sql"), &[])
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
|
||||
|
||||
for row in &rows {
|
||||
let schema: String = row.get("f_table_schema");
|
||||
let table: String = row.get("f_table_name");
|
||||
let id = format!("{}.{}", schema, table);
|
||||
for row in &rows {
|
||||
let schema: String = row.get("f_table_schema");
|
||||
let table: String = row.get("f_table_name");
|
||||
let id = format!("{}.{}", schema, table);
|
||||
|
||||
let geometry_column: String = row.get("f_geometry_column");
|
||||
let srid: i32 = row.get("srid");
|
||||
let geometry_column: String = row.get("f_geometry_column");
|
||||
let srid: i32 = row.get("srid");
|
||||
|
||||
info!("Found {} table source", id);
|
||||
info!("Found {} table source", id);
|
||||
|
||||
if srid == 0 {
|
||||
warn!("{} has SRID 0, skipping", id);
|
||||
continue;
|
||||
if srid == 0 {
|
||||
warn!("{} has SRID 0, skipping", id);
|
||||
continue;
|
||||
}
|
||||
|
||||
let properties = utils::json_to_hashmap(&row.get("properties"));
|
||||
|
||||
let source = TableSource {
|
||||
id: id.to_string(),
|
||||
schema,
|
||||
table,
|
||||
id_column: None,
|
||||
geometry_column,
|
||||
srid: srid as u32,
|
||||
extent: Some(DEFAULT_EXTENT),
|
||||
buffer: Some(DEFAULT_BUFFER),
|
||||
clip_geom: Some(DEFAULT_CLIP_GEOM),
|
||||
geometry_type: row.get("type"),
|
||||
properties,
|
||||
};
|
||||
|
||||
sources.insert(id, Box::new(source));
|
||||
}
|
||||
|
||||
let properties = utils::json_to_hashmap(&row.get("properties"));
|
||||
|
||||
let source = TableSource {
|
||||
id: id.to_string(),
|
||||
schema,
|
||||
table,
|
||||
id_column: None,
|
||||
geometry_column,
|
||||
srid: srid as u32,
|
||||
extent: Some(DEFAULT_EXTENT),
|
||||
buffer: Some(DEFAULT_BUFFER),
|
||||
clip_geom: Some(DEFAULT_CLIP_GEOM),
|
||||
geometry_type: row.get("type"),
|
||||
properties,
|
||||
};
|
||||
|
||||
sources.insert(id, Box::new(source));
|
||||
}
|
||||
|
||||
Ok(sources)
|
||||
Ok(sources)
|
||||
}
|
||||
|
56
src/utils.rs
56
src/utils.rs
@ -3,49 +3,49 @@ use std::collections::HashMap;
|
||||
use crate::source::{Query, XYZ};
|
||||
|
||||
pub fn prettify_error<E: std::fmt::Display>(message: &'static str) -> impl Fn(E) -> std::io::Error {
|
||||
move |error| std::io::Error::new(std::io::ErrorKind::Other, format!("{}: {}", message, error))
|
||||
move |error| std::io::Error::new(std::io::ErrorKind::Other, format!("{}: {}", message, error))
|
||||
}
|
||||
|
||||
// https://github.com/mapbox/postgis-vt-util/blob/master/src/TileBBox.sql
|
||||
pub fn tilebbox(xyz: &XYZ) -> String {
|
||||
let x = xyz.x;
|
||||
let y = xyz.y;
|
||||
let z = xyz.z;
|
||||
let x = xyz.x;
|
||||
let y = xyz.y;
|
||||
let z = xyz.z;
|
||||
|
||||
let max = 20_037_508.34;
|
||||
let res = (max * 2.0) / f64::from(2_i32.pow(z));
|
||||
let max = 20_037_508.34;
|
||||
let res = (max * 2.0) / f64::from(2_i32.pow(z));
|
||||
|
||||
let xmin = -max + (f64::from(x) * res);
|
||||
let ymin = max - (f64::from(y) * res);
|
||||
let xmax = -max + (f64::from(x) * res) + res;
|
||||
let ymax = max - (f64::from(y) * res) - res;
|
||||
let xmin = -max + (f64::from(x) * res);
|
||||
let ymin = max - (f64::from(y) * res);
|
||||
let xmax = -max + (f64::from(x) * res) + res;
|
||||
let ymax = max - (f64::from(y) * res) - res;
|
||||
|
||||
format!(
|
||||
"ST_MakeEnvelope({0}, {1}, {2}, {3}, 3857)",
|
||||
xmin, ymin, xmax, ymax
|
||||
)
|
||||
format!(
|
||||
"ST_MakeEnvelope({0}, {1}, {2}, {3}, 3857)",
|
||||
xmin, ymin, xmax, ymax
|
||||
)
|
||||
}
|
||||
|
||||
pub fn json_to_hashmap(value: &serde_json::Value) -> HashMap<String, String> {
|
||||
let mut hashmap = HashMap::new();
|
||||
let mut hashmap = HashMap::new();
|
||||
|
||||
let object = value.as_object().unwrap();
|
||||
for (key, value) in object {
|
||||
let string_value = value.as_str().unwrap();
|
||||
hashmap.insert(key.to_string(), string_value.to_string());
|
||||
}
|
||||
let object = value.as_object().unwrap();
|
||||
for (key, value) in object {
|
||||
let string_value = value.as_str().unwrap();
|
||||
hashmap.insert(key.to_string(), string_value.to_string());
|
||||
}
|
||||
|
||||
hashmap
|
||||
hashmap
|
||||
}
|
||||
|
||||
pub fn query_to_json_string(query: &Query) -> Result<String, serde_json::Error> {
|
||||
let mut query_as_json = HashMap::new();
|
||||
for (k, v) in query.iter() {
|
||||
let json_value: serde_json::Value =
|
||||
serde_json::from_str(v).unwrap_or_else(|_| serde_json::Value::String(v.clone()));
|
||||
let mut query_as_json = HashMap::new();
|
||||
for (k, v) in query.iter() {
|
||||
let json_value: serde_json::Value =
|
||||
serde_json::from_str(v).unwrap_or_else(|_| serde_json::Value::String(v.clone()));
|
||||
|
||||
query_as_json.insert(k, json_value);
|
||||
}
|
||||
query_as_json.insert(k, json_value);
|
||||
}
|
||||
|
||||
serde_json::to_string(&query_as_json)
|
||||
serde_json::to_string(&query_as_json)
|
||||
}
|
||||
|
@ -7,30 +7,34 @@ use crate::messages;
|
||||
use crate::table_source::TableSources;
|
||||
|
||||
pub struct WorkerActor {
|
||||
pub table_sources: Rc<RefCell<Option<TableSources>>>,
|
||||
pub function_sources: Rc<RefCell<Option<FunctionSources>>>,
|
||||
pub table_sources: Rc<RefCell<Option<TableSources>>>,
|
||||
pub function_sources: Rc<RefCell<Option<FunctionSources>>>,
|
||||
}
|
||||
|
||||
impl Actor for WorkerActor {
|
||||
type Context = Context<Self>;
|
||||
type Context = Context<Self>;
|
||||
}
|
||||
|
||||
impl Handler<messages::RefreshTableSources> for WorkerActor {
|
||||
type Result = ();
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: messages::RefreshTableSources, _: &mut Context<Self>) -> Self::Result {
|
||||
*self.table_sources.borrow_mut() = msg.table_sources;
|
||||
}
|
||||
fn handle(
|
||||
&mut self,
|
||||
msg: messages::RefreshTableSources,
|
||||
_: &mut Context<Self>,
|
||||
) -> Self::Result {
|
||||
*self.table_sources.borrow_mut() = msg.table_sources;
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<messages::RefreshFunctionSources> for WorkerActor {
|
||||
type Result = ();
|
||||
type Result = ();
|
||||
|
||||
fn handle(
|
||||
&mut self,
|
||||
msg: messages::RefreshFunctionSources,
|
||||
_: &mut Context<Self>,
|
||||
) -> Self::Result {
|
||||
*self.function_sources.borrow_mut() = msg.function_sources;
|
||||
}
|
||||
fn handle(
|
||||
&mut self,
|
||||
msg: messages::RefreshFunctionSources,
|
||||
_: &mut Context<Self>,
|
||||
) -> Self::Result {
|
||||
*self.function_sources.borrow_mut() = msg.function_sources;
|
||||
}
|
||||
}
|
||||
|
@ -9,94 +9,94 @@ use martin::table_source::TableSources;
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_get_table_sources_ok() {
|
||||
let state = mock_state(mock_table_sources(), None);
|
||||
let mut app = test::init_service(App::new().data(state).configure(router)).await;
|
||||
let state = mock_state(mock_table_sources(), None);
|
||||
let mut app = test::init_service(App::new().data(state).configure(router)).await;
|
||||
|
||||
let req = test::TestRequest::get().uri("/index.json").to_request();
|
||||
let response = test::call_service(&mut app, req).await;
|
||||
assert!(response.status().is_success());
|
||||
let req = test::TestRequest::get().uri("/index.json").to_request();
|
||||
let response = test::call_service(&mut app, req).await;
|
||||
assert!(response.status().is_success());
|
||||
|
||||
let body = test::read_body(response).await;
|
||||
let table_sources: TableSources = serde_json::from_slice(&body).unwrap();
|
||||
assert!(table_sources.contains_key("public.table_source"));
|
||||
let body = test::read_body(response).await;
|
||||
let table_sources: TableSources = serde_json::from_slice(&body).unwrap();
|
||||
assert!(table_sources.contains_key("public.table_source"));
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_get_table_source_ok() {
|
||||
let state = mock_state(mock_table_sources(), None);
|
||||
let mut app = test::init_service(App::new().data(state).configure(router)).await;
|
||||
let state = mock_state(mock_table_sources(), None);
|
||||
let mut app = test::init_service(App::new().data(state).configure(router)).await;
|
||||
|
||||
let req = test::TestRequest::get()
|
||||
.uri("/public.non_existant.json")
|
||||
.to_request();
|
||||
let req = test::TestRequest::get()
|
||||
.uri("/public.non_existant.json")
|
||||
.to_request();
|
||||
|
||||
let response = test::call_service(&mut app, req).await;
|
||||
assert_eq!(response.status(), http::StatusCode::NOT_FOUND);
|
||||
let response = test::call_service(&mut app, req).await;
|
||||
assert_eq!(response.status(), http::StatusCode::NOT_FOUND);
|
||||
|
||||
let req = test::TestRequest::get()
|
||||
.uri("/public.table_source.json")
|
||||
.to_request();
|
||||
let req = test::TestRequest::get()
|
||||
.uri("/public.table_source.json")
|
||||
.to_request();
|
||||
|
||||
let response = test::call_service(&mut app, req).await;
|
||||
assert!(response.status().is_success());
|
||||
let response = test::call_service(&mut app, req).await;
|
||||
assert!(response.status().is_success());
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_get_table_source_tile_ok() {
|
||||
let state = mock_state(mock_table_sources(), None);
|
||||
let mut app = test::init_service(App::new().data(state).configure(router)).await;
|
||||
let state = mock_state(mock_table_sources(), None);
|
||||
let mut app = test::init_service(App::new().data(state).configure(router)).await;
|
||||
|
||||
let req = test::TestRequest::get()
|
||||
.uri("/public.table_source/0/0/0.pbf")
|
||||
.to_request();
|
||||
let req = test::TestRequest::get()
|
||||
.uri("/public.table_source/0/0/0.pbf")
|
||||
.to_request();
|
||||
|
||||
let response = test::call_service(&mut app, req).await;
|
||||
assert!(response.status().is_success());
|
||||
let response = test::call_service(&mut app, req).await;
|
||||
assert!(response.status().is_success());
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_get_function_sources_ok() {
|
||||
let state = mock_state(None, mock_function_sources());
|
||||
let mut app = test::init_service(App::new().data(state).configure(router)).await;
|
||||
let state = mock_state(None, mock_function_sources());
|
||||
let mut app = test::init_service(App::new().data(state).configure(router)).await;
|
||||
|
||||
let req = test::TestRequest::get().uri("/rpc/index.json").to_request();
|
||||
let response = test::call_service(&mut app, req).await;
|
||||
assert!(response.status().is_success());
|
||||
let req = test::TestRequest::get().uri("/rpc/index.json").to_request();
|
||||
let response = test::call_service(&mut app, req).await;
|
||||
assert!(response.status().is_success());
|
||||
|
||||
let body = test::read_body(response).await;
|
||||
let function_sources: FunctionSources = serde_json::from_slice(&body).unwrap();
|
||||
assert!(function_sources.contains_key("public.function_source"));
|
||||
let body = test::read_body(response).await;
|
||||
let function_sources: FunctionSources = serde_json::from_slice(&body).unwrap();
|
||||
assert!(function_sources.contains_key("public.function_source"));
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_get_function_source_ok() {
|
||||
let state = mock_state(None, mock_function_sources());
|
||||
let mut app = test::init_service(App::new().data(state).configure(router)).await;
|
||||
let state = mock_state(None, mock_function_sources());
|
||||
let mut app = test::init_service(App::new().data(state).configure(router)).await;
|
||||
|
||||
let req = test::TestRequest::get()
|
||||
.uri("/rpc/public.non_existant.json")
|
||||
.to_request();
|
||||
let req = test::TestRequest::get()
|
||||
.uri("/rpc/public.non_existant.json")
|
||||
.to_request();
|
||||
|
||||
let response = test::call_service(&mut app, req).await;
|
||||
assert_eq!(response.status(), http::StatusCode::NOT_FOUND);
|
||||
let response = test::call_service(&mut app, req).await;
|
||||
assert_eq!(response.status(), http::StatusCode::NOT_FOUND);
|
||||
|
||||
let req = test::TestRequest::get()
|
||||
.uri("/rpc/public.function_source.json")
|
||||
.to_request();
|
||||
let req = test::TestRequest::get()
|
||||
.uri("/rpc/public.function_source.json")
|
||||
.to_request();
|
||||
|
||||
let response = test::call_service(&mut app, req).await;
|
||||
assert!(response.status().is_success());
|
||||
let response = test::call_service(&mut app, req).await;
|
||||
assert!(response.status().is_success());
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_get_function_source_tile_ok() {
|
||||
let state = mock_state(None, mock_function_sources());
|
||||
let mut app = test::init_service(App::new().data(state).configure(router)).await;
|
||||
let state = mock_state(None, mock_function_sources());
|
||||
let mut app = test::init_service(App::new().data(state).configure(router)).await;
|
||||
|
||||
let req = test::TestRequest::get()
|
||||
.uri("/rpc/public.function_source/0/0/0.pbf")
|
||||
.to_request();
|
||||
let req = test::TestRequest::get()
|
||||
.uri("/rpc/public.function_source/0/0/0.pbf")
|
||||
.to_request();
|
||||
|
||||
let response = test::call_service(&mut app, req).await;
|
||||
assert!(response.status().is_success());
|
||||
let response = test::call_service(&mut app, req).await;
|
||||
assert!(response.status().is_success());
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user