chore: some refactoring of postgres code (#1166)

Move a few functions around for consistency, renamed files. No actual
functionality changes. Some logging output would produce slightly
different results because of the module name changes.
This commit is contained in:
Yuri Astrakhan 2024-01-31 20:17:32 -05:00 committed by GitHub
parent d39d602ee4
commit 955b8da9c6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 135 additions and 122 deletions

View File

@ -9,12 +9,10 @@ use crate::args::BoundsCalcType;
use crate::pg::config::{PgConfig, PgInfo}; use crate::pg::config::{PgConfig, PgInfo};
use crate::pg::config_function::{FuncInfoSources, FunctionInfo}; use crate::pg::config_function::{FuncInfoSources, FunctionInfo};
use crate::pg::config_table::{TableInfo, TableInfoSources}; use crate::pg::config_table::{TableInfo, TableInfoSources};
use crate::pg::function_source::{merge_func_info, query_available_function};
use crate::pg::pg_source::{PgSource, PgSqlInfo}; use crate::pg::pg_source::{PgSource, PgSqlInfo};
use crate::pg::pool::PgPool; use crate::pg::pool::PgPool;
use crate::pg::table_source::{ use crate::pg::query_functions::query_available_function;
calc_srid, merge_table_info, query_available_tables, table_to_query, use crate::pg::query_tables::{query_available_tables, table_to_query};
};
use crate::pg::utils::{find_info, find_kv_ignore_case, normalize_key, InfoMap}; use crate::pg::utils::{find_info, find_kv_ignore_case, normalize_key, InfoMap};
use crate::pg::PgError::InvalidTableExtent; use crate::pg::PgError::InvalidTableExtent;
use crate::pg::{PgCfgPublish, PgCfgPublishFuncs, PgResult}; use crate::pg::{PgCfgPublish, PgCfgPublishFuncs, PgResult};
@ -26,6 +24,20 @@ use crate::OptBoolObj::{Bool, NoValue, Object};
pub type SqlFuncInfoMapMap = InfoMap<InfoMap<(PgSqlInfo, FunctionInfo)>>; pub type SqlFuncInfoMapMap = InfoMap<InfoMap<(PgSqlInfo, FunctionInfo)>>;
pub type SqlTableInfoMapMapMap = InfoMap<InfoMap<InfoMap<TableInfo>>>; pub type SqlTableInfoMapMapMap = InfoMap<InfoMap<InfoMap<TableInfo>>>;
/// A builder for creating a set of sources from a Postgres database
#[derive(Debug)]
pub struct PgBuilder {
pool: PgPool,
default_srid: Option<i32>,
auto_bounds: BoundsCalcType,
max_feature_count: Option<usize>,
auto_functions: Option<PgBuilderFuncs>,
auto_tables: Option<PgBuilderTables>,
id_resolver: IdResolver,
tables: TableInfoSources,
functions: FuncInfoSources,
}
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
#[cfg_attr(test, serde_with::skip_serializing_none, derive(serde::Serialize))] #[cfg_attr(test, serde_with::skip_serializing_none, derive(serde::Serialize))]
pub struct PgBuilderFuncs { pub struct PgBuilderFuncs {
@ -45,19 +57,6 @@ pub struct PgBuilderTables {
extent: Option<u32>, extent: Option<u32>,
} }
#[derive(Debug)]
pub struct PgBuilder {
pool: PgPool,
default_srid: Option<i32>,
auto_bounds: BoundsCalcType,
max_feature_count: Option<usize>,
auto_functions: Option<PgBuilderFuncs>,
auto_tables: Option<PgBuilderTables>,
id_resolver: IdResolver,
tables: TableInfoSources,
functions: FuncInfoSources,
}
/// Combine `from_schema` field from the `config.auto_publish` and `config.auto_publish.tables/functions` /// Combine `from_schema` field from the `config.auto_publish` and `config.auto_publish.tables/functions`
macro_rules! get_auto_schemas { macro_rules! get_auto_schemas {
($config:expr, $typ:ident) => { ($config:expr, $typ:ident) => {
@ -140,8 +139,7 @@ impl PgBuilder {
let dup = if dup { "duplicate " } else { "" }; let dup = if dup { "duplicate " } else { "" };
let id2 = self.resolve_id(id, cfg_inf); let id2 = self.resolve_id(id, cfg_inf);
let Some(merged_inf) = merge_table_info(self.default_srid, &id2, cfg_inf, db_inf) let Some(merged_inf) = db_inf.append_cfg_info(cfg_inf, &id2, self.default_srid) else {
else {
continue; continue;
}; };
warn_on_rename(id, &id2, "Table"); warn_on_rename(id, &id2, "Table");
@ -184,9 +182,7 @@ impl PgBuilder {
.replace("{table}", &table) .replace("{table}", &table)
.replace("{column}", &geom_column); .replace("{column}", &geom_column);
let id2 = self.resolve_id(&source_id, &db_inf); let id2 = self.resolve_id(&source_id, &db_inf);
let Some(srid) = let Some(srid) = db_inf.calc_srid(&id2, 0, self.default_srid) else {
calc_srid(&db_inf.format_id(), &id2, db_inf.srid, 0, self.default_srid)
else {
continue; continue;
}; };
db_inf.srid = srid; db_inf.srid = srid;
@ -243,7 +239,7 @@ impl PgBuilder {
continue; continue;
}; };
let merged_inf = merge_func_info(cfg_inf, db_inf); let merged_inf = db_inf.append_cfg_info(cfg_inf);
let dup = !used.insert((&cfg_inf.schema, func_name)); let dup = !used.insert((&cfg_inf.schema, func_name));
let dup = if dup { "duplicate " } else { "" }; let dup = if dup { "duplicate " } else { "" };

View File

@ -8,9 +8,9 @@ use tilejson::TileJSON;
use crate::args::{BoundsCalcType, DEFAULT_BOUNDS_TIMEOUT}; use crate::args::{BoundsCalcType, DEFAULT_BOUNDS_TIMEOUT};
use crate::config::{copy_unrecognized_config, UnrecognizedValues}; use crate::config::{copy_unrecognized_config, UnrecognizedValues};
use crate::pg::builder::PgBuilder;
use crate::pg::config_function::FuncInfoSources; use crate::pg::config_function::FuncInfoSources;
use crate::pg::config_table::TableInfoSources; use crate::pg::config_table::TableInfoSources;
use crate::pg::configurator::PgBuilder;
use crate::pg::utils::on_slow; use crate::pg::utils::on_slow;
use crate::pg::PgResult; use crate::pg::PgResult;
use crate::source::TileInfoSources; use crate::source::TileInfoSources;

View File

@ -83,3 +83,15 @@ impl PgInfo for FunctionInfo {
patch_json(tilejson, self.tilejson.as_ref()) patch_json(tilejson, self.tilejson.as_ref())
} }
} }
impl FunctionInfo {
/// For a given function info discovered from the database, append the configuration info provided by the user
#[must_use]
pub fn append_cfg_info(&self, cfg_inf: &FunctionInfo) -> FunctionInfo {
FunctionInfo {
// TileJson does not need to be merged because it cannot be de-serialized from config
tilejson: self.tilejson.clone(),
..cfg_inf.clone()
}
}
}

View File

@ -1,11 +1,12 @@
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
use log::{info, warn};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tilejson::{Bounds, TileJSON, VectorLayer}; use tilejson::{Bounds, TileJSON, VectorLayer};
use crate::config::UnrecognizedValues; use crate::config::UnrecognizedValues;
use crate::pg::config::PgInfo; use crate::pg::config::PgInfo;
use crate::pg::utils::{patch_json, InfoMap}; use crate::pg::utils::{normalize_key, patch_json, InfoMap};
pub type TableInfoSources = InfoMap<TableInfo>; pub type TableInfoSources = InfoMap<TableInfo>;
@ -103,3 +104,86 @@ impl PgInfo for TableInfo {
patch_json(tilejson, self.tilejson.as_ref()) patch_json(tilejson, self.tilejson.as_ref())
} }
} }
impl TableInfo {
/// For a given table info discovered from the database, append the configuration info provided by the user
#[must_use]
pub fn append_cfg_info(
&self,
cfg_inf: &TableInfo,
new_id: &String,
default_srid: Option<i32>,
) -> Option<Self> {
// Assume cfg_inf and self have the same schema/table/geometry_column
let mut inf = TableInfo {
// These values must match the database exactly
schema: self.schema.clone(),
table: self.table.clone(),
geometry_column: self.geometry_column.clone(),
// These values are not serialized, so copy auto-detected values from the database
geometry_index: self.geometry_index,
is_view: self.is_view,
tilejson: self.tilejson.clone(),
// Srid requires some logic
srid: self.calc_srid(new_id, cfg_inf.srid, default_srid)?,
prop_mapping: HashMap::new(),
..cfg_inf.clone()
};
match (&self.geometry_type, &cfg_inf.geometry_type) {
(Some(src), Some(cfg)) if src != cfg => {
warn!(
r#"Table {} has geometry type={src}, but source {new_id} has {cfg}"#,
self.format_id()
);
}
_ => {}
}
let empty = BTreeMap::new();
let props = self.properties.as_ref().unwrap_or(&empty);
if let Some(id_column) = &cfg_inf.id_column {
let prop = normalize_key(props, id_column.as_str(), "id_column", new_id)?;
inf.prop_mapping.insert(id_column.clone(), prop);
}
if let Some(p) = &cfg_inf.properties {
for key in p.keys() {
let prop = normalize_key(props, key.as_str(), "property", new_id)?;
inf.prop_mapping.insert(key.clone(), prop);
}
}
Some(inf)
}
/// Determine the SRID value to use for a table, or None if unknown, assuming self is a table info from the database
#[must_use]
pub fn calc_srid(&self, new_id: &str, cfg_srid: i32, default_srid: Option<i32>) -> Option<i32> {
match (self.srid, cfg_srid, default_srid) {
(0, 0, Some(default_srid)) => {
info!(
"Table {} has SRID=0, using provided default SRID={default_srid}",
self.format_id()
);
Some(default_srid)
}
(0, 0, None) => {
let info = "To use this table source, set default or specify this table SRID in the config file, or set the default SRID with --default-srid=...";
warn!("Table {} has SRID=0, skipping. {info}", self.format_id());
None
}
(0, cfg, _) => Some(cfg), // Use the configured SRID
(src, 0, _) => Some(src), // Use the source SRID
(src, cfg, _) if src != cfg => {
warn!(
"Table {} has SRID={src}, but source {new_id} has SRID={cfg}",
self.format_id()
);
None
}
(_, cfg, _) => Some(cfg),
}
}
}

View File

@ -1,12 +1,12 @@
mod builder;
mod config; mod config;
mod config_function; mod config_function;
mod config_table; mod config_table;
mod configurator;
mod errors; mod errors;
mod function_source;
mod pg_source; mod pg_source;
mod pool; mod pool;
mod table_source; mod query_functions;
mod query_tables;
mod tls; mod tls;
mod utils; mod utils;
@ -14,5 +14,5 @@ pub use config::{PgCfgPublish, PgCfgPublishFuncs, PgCfgPublishTables, PgConfig,
pub use config_function::FunctionInfo; pub use config_function::FunctionInfo;
pub use config_table::TableInfo; pub use config_table::TableInfo;
pub use errors::{PgError, PgResult}; pub use errors::{PgError, PgResult};
pub use function_source::query_available_function;
pub use pool::{PgPool, POOL_SIZE_DEFAULT}; pub use pool::{PgPool, POOL_SIZE_DEFAULT};
pub use query_functions::query_available_function;

View File

@ -5,8 +5,8 @@ use log::{debug, warn};
use postgres_protocol::escape::escape_identifier; use postgres_protocol::escape::escape_identifier;
use serde_json::Value; use serde_json::Value;
use crate::pg::builder::SqlFuncInfoMapMap;
use crate::pg::config_function::FunctionInfo; use crate::pg::config_function::FunctionInfo;
use crate::pg::configurator::SqlFuncInfoMapMap;
use crate::pg::pg_source::PgSqlInfo; use crate::pg::pg_source::PgSqlInfo;
use crate::pg::pool::PgPool; use crate::pg::pool::PgPool;
use crate::pg::PgError::PostgresError; use crate::pg::PgError::PostgresError;
@ -62,7 +62,7 @@ pub async fn query_available_function(pool: &PgPool) -> PgResult<SqlFuncInfoMapM
// Query preparation: the schema and function can't be part of a prepared query, so they // Query preparation: the schema and function can't be part of a prepared query, so they
// need to be escaped by hand. // need to be escaped by hand.
// However schema and function comes from database introspection so they should be safe. // However, schema and function comes from database introspection, so they should be safe.
let mut query = String::new(); let mut query = String::new();
query.push_str(&escape_identifier(&schema)); query.push_str(&escape_identifier(&schema));
query.push('.'); query.push('.');
@ -118,14 +118,6 @@ pub async fn query_available_function(pool: &PgPool) -> PgResult<SqlFuncInfoMapM
Ok(res) Ok(res)
} }
pub fn merge_func_info(cfg_inf: &FunctionInfo, db_inf: &FunctionInfo) -> FunctionInfo {
FunctionInfo {
// TileJson does not need to be merged because it cannot be de-serialized from config
tilejson: db_inf.tilejson.clone(),
..cfg_inf.clone()
}
}
fn jsonb_to_vec(jsonb: Option<Value>) -> Option<Vec<String>> { fn jsonb_to_vec(jsonb: Option<Value>) -> Option<Vec<String>> {
jsonb.map(|json| { jsonb.map(|json| {
json.as_array() json.as_array()

View File

@ -1,7 +1,7 @@
use std::collections::{BTreeMap, HashMap}; use std::collections::HashMap;
use futures::pin_mut; use futures::pin_mut;
use log::{debug, info, warn}; use log::{debug, warn};
use postgis::ewkb; use postgis::ewkb;
use postgres_protocol::escape::{escape_identifier, escape_literal}; use postgres_protocol::escape::{escape_identifier, escape_literal};
use serde_json::Value; use serde_json::Value;
@ -9,12 +9,12 @@ use tilejson::Bounds;
use tokio::time::timeout; use tokio::time::timeout;
use crate::args::{BoundsCalcType, DEFAULT_BOUNDS_TIMEOUT}; use crate::args::{BoundsCalcType, DEFAULT_BOUNDS_TIMEOUT};
use crate::pg::builder::SqlTableInfoMapMapMap;
use crate::pg::config::PgInfo; use crate::pg::config::PgInfo;
use crate::pg::config_table::TableInfo; use crate::pg::config_table::TableInfo;
use crate::pg::configurator::SqlTableInfoMapMapMap;
use crate::pg::pg_source::PgSqlInfo; use crate::pg::pg_source::PgSqlInfo;
use crate::pg::pool::PgPool; use crate::pg::pool::PgPool;
use crate::pg::utils::{json_to_hashmap, normalize_key, polygon_to_bbox}; use crate::pg::utils::{json_to_hashmap, polygon_to_bbox};
use crate::pg::PgError::PostgresError; use crate::pg::PgError::PostgresError;
use crate::pg::PgResult; use crate::pg::PgResult;
@ -22,6 +22,7 @@ static DEFAULT_EXTENT: u32 = 4096;
static DEFAULT_BUFFER: u32 = 64; static DEFAULT_BUFFER: u32 = 64;
static DEFAULT_CLIP_GEOM: bool = true; static DEFAULT_CLIP_GEOM: bool = true;
/// Examine a database to get a list of all tables that have geometry columns.
pub async fn query_available_tables(pool: &PgPool) -> PgResult<SqlTableInfoMapMapMap> { pub async fn query_available_tables(pool: &PgPool) -> PgResult<SqlTableInfoMapMapMap> {
let conn = pool.get().await?; let conn = pool.get().await?;
let rows = conn let rows = conn
@ -82,6 +83,8 @@ pub async fn query_available_tables(pool: &PgPool) -> PgResult<SqlTableInfoMapMa
Ok(res) Ok(res)
} }
/// Generate an SQL snippet to escape a column name, and optionally alias it.
/// Assumes to not be the first column in a SELECT statement.
fn escape_with_alias(mapping: &HashMap<String, String>, field: &str) -> String { fn escape_with_alias(mapping: &HashMap<String, String>, field: &str) -> String {
let column = mapping.get(field).map_or(field, |v| v.as_str()); let column = mapping.get(field).map_or(field, |v| v.as_str());
if field == column { if field == column {
@ -95,6 +98,8 @@ fn escape_with_alias(mapping: &HashMap<String, String>, field: &str) -> String {
} }
} }
/// Generate a query to fetch tiles from a table.
/// The function is async because it may need to query the database for the table bounds (could be very slow).
pub async fn table_to_query( pub async fn table_to_query(
id: String, id: String,
mut info: TableInfo, mut info: TableInfo,
@ -193,6 +198,7 @@ FROM (
Ok((id, PgSqlInfo::new(query, false, info.format_id()), info)) Ok((id, PgSqlInfo::new(query, false, info.format_id()), info))
} }
/// Compute the bounds of a table. This could be slow if the table is large or has no geo index.
async fn calc_bounds( async fn calc_bounds(
pool: &PgPool, pool: &PgPool,
schema: &str, schema: &str,
@ -220,80 +226,3 @@ FROM {schema}.{table};
.get::<_, Option<ewkb::Polygon>>("bounds") .get::<_, Option<ewkb::Polygon>>("bounds")
.and_then(|p| polygon_to_bbox(&p))) .and_then(|p| polygon_to_bbox(&p)))
} }
#[must_use]
pub fn merge_table_info(
default_srid: Option<i32>,
new_id: &String,
cfg_inf: &TableInfo,
db_inf: &TableInfo,
) -> Option<TableInfo> {
// Assume cfg_inf and db_inf have the same schema/table/geometry_column
let table_id = db_inf.format_id();
let mut inf = TableInfo {
// These values must match the database exactly
schema: db_inf.schema.clone(),
table: db_inf.table.clone(),
geometry_column: db_inf.geometry_column.clone(),
// These values are not serialized, so copy auto-detected values from the database
geometry_index: db_inf.geometry_index,
is_view: db_inf.is_view,
tilejson: db_inf.tilejson.clone(),
// Srid requires some logic
srid: calc_srid(&table_id, new_id, db_inf.srid, cfg_inf.srid, default_srid)?,
prop_mapping: HashMap::new(),
..cfg_inf.clone()
};
match (&db_inf.geometry_type, &cfg_inf.geometry_type) {
(Some(src), Some(cfg)) if src != cfg => {
warn!(r#"Table {table_id} has geometry type={src}, but source {new_id} has {cfg}"#);
}
_ => {}
}
let empty = BTreeMap::new();
let props = db_inf.properties.as_ref().unwrap_or(&empty);
if let Some(id_column) = &cfg_inf.id_column {
let prop = normalize_key(props, id_column.as_str(), "id_column", new_id)?;
inf.prop_mapping.insert(id_column.clone(), prop);
}
if let Some(p) = &cfg_inf.properties {
for key in p.keys() {
let prop = normalize_key(props, key.as_str(), "property", new_id)?;
inf.prop_mapping.insert(key.clone(), prop);
}
}
Some(inf)
}
#[must_use]
pub fn calc_srid(
table_id: &str,
new_id: &str,
db_srid: i32,
cfg_srid: i32,
default_srid: Option<i32>,
) -> Option<i32> {
match (db_srid, cfg_srid, default_srid) {
(0, 0, Some(default_srid)) => {
info!("Table {table_id} has SRID=0, using provided default SRID={default_srid}");
Some(default_srid)
}
(0, 0, None) => {
let info = "To use this table source, set default or specify this table SRID in the config file, or set the default SRID with --default-srid=...";
warn!("Table {table_id} has SRID=0, skipping. {info}");
None
}
(0, cfg, _) => Some(cfg), // Use the configured SRID
(src, 0, _) => Some(src), // Use the source SRID
(src, cfg, _) if src != cfg => {
warn!("Table {table_id} has SRID={src}, but source {new_id} has SRID={cfg}");
None
}
(_, cfg, _) => Some(cfg),
}
}

View File

@ -297,7 +297,7 @@ test_jsn fnc_comment function_Mixed_Name
kill_process $MARTIN_PROC_ID Martin kill_process $MARTIN_PROC_ID Martin
test_log_has_str "$LOG_FILE" 'WARN martin::pg::table_source] Table public.table_source has no spatial index on column geom' test_log_has_str "$LOG_FILE" 'WARN martin::pg::query_tables] Table public.table_source has no spatial index on column geom'
test_log_has_str "$LOG_FILE" 'WARN martin::fonts] Ignoring duplicate font Overpass Mono Regular from tests' test_log_has_str "$LOG_FILE" 'WARN martin::fonts] Ignoring duplicate font Overpass Mono Regular from tests'
validate_log "$LOG_FILE" validate_log "$LOG_FILE"
remove_line "${TEST_OUT_DIR}/save_config.yaml" " connection_string: " remove_line "${TEST_OUT_DIR}/save_config.yaml" " connection_string: "
@ -380,7 +380,7 @@ test_jsn tbl_comment_cfg MixPoints
test_jsn fnc_comment_cfg fnc_Mixed_Name test_jsn fnc_comment_cfg fnc_Mixed_Name
kill_process $MARTIN_PROC_ID Martin kill_process $MARTIN_PROC_ID Martin
test_log_has_str "$LOG_FILE" 'WARN martin::pg::table_source] Table public.table_source has no spatial index on column geom' test_log_has_str "$LOG_FILE" 'WARN martin::pg::query_tables] Table public.table_source has no spatial index on column geom'
test_log_has_str "$LOG_FILE" 'WARN martin::fonts] Ignoring duplicate font Overpass Mono Regular from tests' test_log_has_str "$LOG_FILE" 'WARN martin::fonts] Ignoring duplicate font Overpass Mono Regular from tests'
validate_log "$LOG_FILE" validate_log "$LOG_FILE"
remove_line "${TEST_OUT_DIR}/save_config.yaml" " connection_string: " remove_line "${TEST_OUT_DIR}/save_config.yaml" " connection_string: "