diff --git a/v3/Cargo.lock b/v3/Cargo.lock index 8d7bae9eea5..a5ad04f60fe 100644 --- a/v3/Cargo.lock +++ b/v3/Cargo.lock @@ -24,6 +24,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", + "const-random", "getrandom", "once_cell", "version_check", @@ -39,6 +40,27 @@ dependencies = [ "memchr", ] +[[package]] +name = "alloc-no-stdlib" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" +dependencies = [ + "alloc-no-stdlib", +] + +[[package]] +name = "allocator-api2" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -126,6 +148,239 @@ dependencies = [ "thiserror", ] +[[package]] +name = "arrayref" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b4930d2cb77ce62f89ee5d5289b4ac049559b1c45539271f5ed4fdc7db34545" + +[[package]] +name = "arrayvec" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" + +[[package]] +name = "arrow" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ae9728f104939be6d8d9b368a354b4929b0569160ea1641f0721b55a861ce38" +dependencies = [ + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-csv", + "arrow-data", + "arrow-ipc", + "arrow-json", + "arrow-ord", + "arrow-row", + "arrow-schema", + "arrow-select", + "arrow-string", +] + +[[package]] +name = "arrow-arith" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7029a5b3efbeafbf4a12d12dc16b8f9e9bff20a410b8c25c5d28acc089e1043" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "half", + "num", +] + +[[package]] +name = "arrow-array" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d33238427c60271710695f17742f45b1a5dc5bcfc5c15331c25ddfe7abf70d97" +dependencies = [ + "ahash", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "chrono-tz", + "half", + "hashbrown 0.14.5", + "num", +] + +[[package]] +name = "arrow-buffer" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe9b95e825ae838efaf77e366c00d3fc8cca78134c9db497d6bda425f2e7b7c1" +dependencies = [ + "bytes", + "half", + "num", +] + +[[package]] +name = "arrow-cast" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87cf8385a9d5b5fcde771661dd07652b79b9139fea66193eda6a88664400ccab" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "atoi", + "base64 0.22.1", + "chrono", + "comfy-table", + "half", + "lexical-core", + "num", + "ryu", +] + +[[package]] +name = "arrow-csv" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cea5068bef430a86690059665e40034625ec323ffa4dd21972048eebb0127adc" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "chrono", + "csv", + "csv-core", + "lazy_static", + "lexical-core", + "regex", +] + +[[package]] +name = "arrow-data" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb29be98f987bcf217b070512bb7afba2f65180858bca462edf4a39d84a23e10" +dependencies = [ + "arrow-buffer", + "arrow-schema", + "half", + "num", +] + +[[package]] +name = "arrow-ipc" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffc68f6523970aa6f7ce1dc9a33a7d9284cfb9af77d4ad3e617dbe5d79cc6ec8" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "flatbuffers", + "lz4_flex", +] + +[[package]] +name = "arrow-json" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2041380f94bd6437ab648e6c2085a045e45a0c44f91a1b9a4fe3fed3d379bfb1" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "chrono", + "half", + "indexmap 2.2.6", + "lexical-core", + "num", + "serde", + "serde_json", +] + +[[package]] +name = "arrow-ord" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcb56ed1547004e12203652f12fe12e824161ff9d1e5cf2a7dc4ff02ba94f413" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "half", + "num", +] + +[[package]] +name = "arrow-row" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "575b42f1fc588f2da6977b94a5ca565459f5ab07b60545e17243fb9a7ed6d43e" +dependencies = [ + "ahash", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "half", + "hashbrown 0.14.5", +] + +[[package]] +name = "arrow-schema" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32aae6a60458a2389c0da89c9de0b7932427776127da1a738e2efc21d32f3393" +dependencies = [ + "serde", +] + +[[package]] +name = "arrow-select" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de36abaef8767b4220d7b4a8c2fe5ffc78b47db81b03d77e2136091c3ba39102" +dependencies = [ + "ahash", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "num", +] + +[[package]] +name = "arrow-string" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e435ada8409bcafc910bc3e0077f532a4daa20e99060a496685c0e3e53cc2597" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "memchr", + "num", + "regex", + "regex-syntax", +] + [[package]] name = "ascii" version = "0.9.3" @@ -142,6 +397,24 @@ dependencies = [ "serde_json", ] +[[package]] +name = "async-compression" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd066d0b4ef8ecb03a55319dc13aa6910616d0f44008a045bb1835af830abff5" +dependencies = [ + "bzip2", + "flate2", + "futures-core", + "futures-io", + "memchr", + "pin-project-lite", + "tokio", + "xz2", + "zstd", + "zstd-safe", +] + [[package]] name = "async-graphql-parser" version = "7.0.6" @@ -174,7 +447,7 @@ checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -196,7 +469,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -207,7 +480,16 @@ checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.68", +] + +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", ] [[package]] @@ -324,9 +606,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.5.0" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1" +checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" [[package]] name = "bitvec" @@ -340,6 +622,28 @@ dependencies = [ "wyz", ] +[[package]] +name = "blake2" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46502ad458c9a52b69d4d4d32775c788b7a1b85e8bc9d482d92250fc0e3f8efe" +dependencies = [ + "digest 0.10.7", +] + +[[package]] +name = "blake3" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30cca6d3674597c30ddf2c587bf8d9d65c9a84d2326d941cc79c9842dfe0ef52" +dependencies = [ + "arrayref", + "arrayvec", + "cc", + "cfg-if", + "constant_time_eq", +] + [[package]] name = "block-buffer" version = "0.9.0" @@ -358,6 +662,27 @@ dependencies = [ "generic-array", ] +[[package]] +name = "brotli" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74f7971dbd9326d58187408ab83117d8ac1bb9c17b085fdacd1cf2f598719b6b" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "4.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a45bd2e4095a8b518033b128020dd4a55aab1c0a381ba4404a472630f4bc362" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bson" version = "2.11.0" @@ -421,6 +746,27 @@ dependencies = [ "serde", ] +[[package]] +name = "bzip2" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdb116a6ef3f6c3698828873ad02c3014b3c85cadb88496095628e3ef1e347f8" +dependencies = [ + "bzip2-sys", + "libc", +] + +[[package]] +name = "bzip2-sys" +version = "0.1.11+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "736a955f3fa7875102d57c82b8cac37ec45224a07fd32d58f9f7a186b6cd4cdc" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "cast" version = "0.3.0" @@ -429,9 +775,14 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.0.99" +version = "1.0.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96c51067fd44124faa7f870b4b1c969379ad32b2ba805aa959430ceaa384f695" +checksum = "c891175c3fb232128f48de6590095e59198bbeb8620c310be349bfc3afd12c7b" +dependencies = [ + "jobserver", + "libc", + "once_cell", +] [[package]] name = "cfg-if" @@ -452,6 +803,28 @@ dependencies = [ "windows-targets 0.52.5", ] +[[package]] +name = "chrono-tz" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93698b29de5e97ad0ae26447b344c482a7284c737d9ddc5f9e52b74a336671bb" +dependencies = [ + "chrono", + "chrono-tz-build", + "phf", +] + +[[package]] +name = "chrono-tz-build" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c088aee841df9c3041febbb73934cfc39708749bf96dc827e3359cd39ef11b1" +dependencies = [ + "parse-zoneinfo", + "phf", + "phf_codegen", +] + [[package]] name = "ciborium" version = "0.2.2" @@ -507,10 +880,10 @@ version = "4.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c780290ccf4fb26629baa7a1081e68ced113f1d3ec302fa5948f1c381ebf06c6" dependencies = [ - "heck", + "heck 0.5.0", "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -544,6 +917,17 @@ dependencies = [ "unreachable", ] +[[package]] +name = "comfy-table" +version = "7.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b34115915337defe99b2aff5c2ce6771e5fbc4079f4b506301f5cf394c8452f7" +dependencies = [ + "strum", + "strum_macros", + "unicode-width", +] + [[package]] name = "console" version = "0.15.8" @@ -562,6 +946,32 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d6f2aa4d0537bcc1c74df8755072bd31c1ef1a3a1b85a68e8404a8c353b7b8b" +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom", + "once_cell", + "tiny-keccak", +] + +[[package]] +name = "constant_time_eq" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7144d30dcf0fafbce74250a3963025d8d52177934239851c917d29f1df280c2" + [[package]] name = "convert_case" version = "0.4.0" @@ -618,6 +1028,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crc32fast" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3" +dependencies = [ + "cfg-if", +] + [[package]] name = "criterion" version = "0.5.1" @@ -734,6 +1153,27 @@ dependencies = [ "subtle", ] +[[package]] +name = "csv" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70" +dependencies = [ + "memchr", +] + [[package]] name = "custom-connector" version = "0.1.0" @@ -772,7 +1212,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn", + "syn 2.0.68", ] [[package]] @@ -783,7 +1223,323 @@ checksum = "733cabb43482b1a1b53eee8583c2b9e8684d592215ea83efd305dd31bc2f0178" dependencies = [ "darling_core", "quote", - "syn", + "syn 2.0.68", +] + +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + +[[package]] +name = "datafusion" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f92d2d7a9cba4580900b32b009848d9eb35f1028ac84cdd6ddcf97612cd0068" +dependencies = [ + "ahash", + "arrow", + "arrow-array", + "arrow-ipc", + "arrow-schema", + "async-compression", + "async-trait", + "bytes", + "bzip2", + "chrono", + "dashmap", + "datafusion-common", + "datafusion-common-runtime", + "datafusion-execution", + "datafusion-expr", + "datafusion-functions", + "datafusion-functions-aggregate", + "datafusion-functions-array", + "datafusion-optimizer", + "datafusion-physical-expr", + "datafusion-physical-expr-common", + "datafusion-physical-plan", + "datafusion-sql", + "flate2", + "futures", + "glob", + "half", + "hashbrown 0.14.5", + "indexmap 2.2.6", + "itertools 0.12.1", + "log", + "num_cpus", + "object_store", + "parking_lot", + "parquet", + "paste", + "pin-project-lite", + "rand", + "sqlparser", + "tempfile", + "tokio", + "tokio-util", + "url", + "uuid", + "xz2", + "zstd", +] + +[[package]] +name = "datafusion-common" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "effed030d2c1667eb1e11df5372d4981eaf5d11a521be32220b3985ae5ba6971" +dependencies = [ + "ahash", + "arrow", + "arrow-array", + "arrow-buffer", + "arrow-schema", + "chrono", + "half", + "hashbrown 0.14.5", + "instant", + "libc", + "num_cpus", + "object_store", + "parquet", + "sqlparser", +] + +[[package]] +name = "datafusion-common-runtime" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0091318129dad1359f08e4c6c71f855163c35bba05d1dbf983196f727857894" +dependencies = [ + "tokio", +] + +[[package]] +name = "datafusion-execution" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8385aba84fc4a06d3ebccfbcbf9b4f985e80c762fac634b49079f7cc14933fb1" +dependencies = [ + "arrow", + "chrono", + "dashmap", + "datafusion-common", + "datafusion-expr", + "futures", + "hashbrown 0.14.5", + "log", + "object_store", + "parking_lot", + "rand", + "tempfile", + "url", +] + +[[package]] +name = "datafusion-expr" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebb192f0055d2ce64e38ac100abc18e4e6ae9734d3c28eee522bbbd6a32108a3" +dependencies = [ + "ahash", + "arrow", + "arrow-array", + "arrow-buffer", + "chrono", + "datafusion-common", + "paste", + "serde_json", + "sqlparser", + "strum", + "strum_macros", +] + +[[package]] +name = "datafusion-functions" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27c081ae5b7edd712b92767fb8ed5c0e32755682f8075707666cd70835807c0b" +dependencies = [ + "arrow", + "base64 0.22.1", + "blake2", + "blake3", + "chrono", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-physical-expr", + "hashbrown 0.14.5", + "hex", + "itertools 0.12.1", + "log", + "md-5", + "rand", + "regex", + "sha2 0.10.8", + "unicode-segmentation", + "uuid", +] + +[[package]] +name = "datafusion-functions-aggregate" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "feb28a4ea52c28a26990646986a27c4052829a2a2572386258679e19263f8b78" +dependencies = [ + "ahash", + "arrow", + "arrow-schema", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-physical-expr-common", + "log", + "paste", + "sqlparser", +] + +[[package]] +name = "datafusion-functions-array" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b17c02a74cdc87380a56758ec27e7d417356bf806f33062700908929aedb8a" +dependencies = [ + "arrow", + "arrow-array", + "arrow-buffer", + "arrow-ord", + "arrow-schema", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-functions", + "itertools 0.12.1", + "log", + "paste", +] + +[[package]] +name = "datafusion-optimizer" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12172f2a6c9eb4992a51e62d709eeba5dedaa3b5369cce37ff6c2260e100ba76" +dependencies = [ + "arrow", + "async-trait", + "chrono", + "datafusion-common", + "datafusion-expr", + "datafusion-physical-expr", + "hashbrown 0.14.5", + "indexmap 2.2.6", + "itertools 0.12.1", + "log", + "regex-syntax", +] + +[[package]] +name = "datafusion-physical-expr" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a3fce531b623e94180f6cd33d620ef01530405751b6ddd2fd96250cdbd78e2e" +dependencies = [ + "ahash", + "arrow", + "arrow-array", + "arrow-buffer", + "arrow-ord", + "arrow-schema", + "arrow-string", + "base64 0.22.1", + "chrono", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-functions-aggregate", + "datafusion-physical-expr-common", + "half", + "hashbrown 0.14.5", + "hex", + "indexmap 2.2.6", + "itertools 0.12.1", + "log", + "paste", + "petgraph", + "regex", +] + +[[package]] +name = "datafusion-physical-expr-common" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "046400b6a2cc3ed57a7c576f5ae6aecc77804ac8e0186926b278b189305b2a77" +dependencies = [ + "arrow", + "datafusion-common", + "datafusion-expr", + "rand", +] + +[[package]] +name = "datafusion-physical-plan" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4aed47f5a2ad8766260befb375b201592e86a08b260256e168ae4311426a2bff" +dependencies = [ + "ahash", + "arrow", + "arrow-array", + "arrow-buffer", + "arrow-ord", + "arrow-schema", + "async-trait", + "chrono", + "datafusion-common", + "datafusion-common-runtime", + "datafusion-execution", + "datafusion-expr", + "datafusion-functions-aggregate", + "datafusion-physical-expr", + "datafusion-physical-expr-common", + "futures", + "half", + "hashbrown 0.14.5", + "indexmap 2.2.6", + "itertools 0.12.1", + "log", + "once_cell", + "parking_lot", + "pin-project-lite", + "rand", + "tokio", +] + +[[package]] +name = "datafusion-sql" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fa92bb1fd15e46ce5fb6f1c85f3ac054592560f294429a28e392b5f9cd4255e" +dependencies = [ + "arrow", + "arrow-array", + "arrow-schema", + "datafusion-common", + "datafusion-expr", + "log", + "regex", + "sqlparser", + "strum", ] [[package]] @@ -815,7 +1571,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn", + "syn 2.0.68", ] [[package]] @@ -864,6 +1620,7 @@ checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer 0.10.4", "crypto-common", + "subtle", ] [[package]] @@ -872,6 +1629,12 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59f8e79d1fbf76bdfbde321e902714bf6c49df88a7dda6fc682fc2979226962d" +[[package]] +name = "doc-comment" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" + [[package]] name = "dyn-clone" version = "1.0.17" @@ -940,7 +1703,6 @@ dependencies = [ "anyhow", "axum", "base64 0.22.1", - "bincode", "build-data", "clap", "criterion", @@ -961,6 +1723,7 @@ dependencies = [ "serde", "serde_json", "serde_path_to_error", + "sql", "thiserror", "tokio", "tokio-test", @@ -1069,6 +1832,32 @@ dependencies = [ "subtle", ] +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + +[[package]] +name = "flatbuffers" +version = "24.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8add37afff2d4ffa83bc748a70b4b1370984f6980768554182424ef71447c35f" +dependencies = [ + "bitflags 1.3.2", + "rustc_version", +] + +[[package]] +name = "flate2" +version = "1.0.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f54427cfd1c7829e2a139fcefea601bf088ebca651d2bf53ebc600eac295dae" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "fluent-uri" version = "0.1.4" @@ -1122,6 +1911,7 @@ checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" dependencies = [ "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -1177,7 +1967,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -1198,6 +1988,7 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -1302,6 +2093,7 @@ checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888" dependencies = [ "cfg-if", "crunchy", + "num-traits", ] [[package]] @@ -1324,6 +2116,10 @@ name = "hashbrown" version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash", + "allocator-api2", +] [[package]] name = "hasura-authn-core" @@ -1399,6 +2195,12 @@ dependencies = [ "stable_deref_trait", ] +[[package]] +name = "heck" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" + [[package]] name = "heck" version = "0.5.0" @@ -1601,6 +2403,24 @@ dependencies = [ "similar", ] +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "integer-encoding" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" + [[package]] name = "ipnet" version = "2.9.0" @@ -1648,6 +2468,15 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" +[[package]] +name = "jobserver" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2b099aaa34a9751c5bf0878add70444e1ed2dd73f347be99003d4577277de6e" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.69" @@ -1835,6 +2664,12 @@ version = "0.2.155" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +[[package]] +name = "libm" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" + [[package]] name = "linked-hash-map" version = "0.5.6" @@ -1863,12 +2698,42 @@ version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +[[package]] +name = "lz4_flex" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5" +dependencies = [ + "twox-hash", +] + +[[package]] +name = "lzma-sys" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fda04ab3764e6cde78b9974eec4f779acaba7c4e84b36eca3cf77c581b85d27" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "matchit" version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest 0.10.7", +] + [[package]] name = "memchr" version = "2.7.4" @@ -2028,6 +2893,20 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "num" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35bd024e8b2ff75562e5f34e7f4905839deb4b22955ef5e73d2fea1b9813cb23" +dependencies = [ + "num-bigint", + "num-complex", + "num-integer", + "num-iter", + "num-rational", + "num-traits", +] + [[package]] name = "num-bigint" version = "0.4.5" @@ -2038,6 +2917,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-complex" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495" +dependencies = [ + "num-traits", +] + [[package]] name = "num-conv" version = "0.1.0" @@ -2053,6 +2941,28 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-iter" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1429034a0490724d0075ebb2bc9e875d6503c3cf69e235a8941aa757d83ef5bf" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-rational" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824" +dependencies = [ + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -2060,6 +2970,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" dependencies = [ "autocfg", + "libm", ] [[package]] @@ -2081,6 +2992,27 @@ dependencies = [ "memchr", ] +[[package]] +name = "object_store" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbebfd32c213ba1907fa7a9c9138015a8de2b43e30c5aa45b18f7deb46786ad6" +dependencies = [ + "async-trait", + "bytes", + "chrono", + "futures", + "humantime", + "itertools 0.12.1", + "parking_lot", + "percent-encoding", + "snafu", + "tokio", + "tracing", + "url", + "walkdir", +] + [[package]] name = "once_cell" version = "1.19.0" @@ -2131,7 +3063,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "syn", + "syn 2.0.68", "thiserror", ] @@ -2141,7 +3073,7 @@ version = "0.10.64" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f" dependencies = [ - "bitflags 2.5.0", + "bitflags 2.6.0", "cfg-if", "foreign-types", "libc", @@ -2158,7 +3090,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -2288,7 +3220,7 @@ dependencies = [ "futures-util", "once_cell", "opentelemetry", - "ordered-float", + "ordered-float 4.2.0", "percent-encoding", "rand", "thiserror", @@ -2296,6 +3228,15 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "ordered-float" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c" +dependencies = [ + "num-traits", +] + [[package]] name = "ordered-float" version = "4.2.0" @@ -2345,6 +3286,51 @@ dependencies = [ "windows-targets 0.52.5", ] +[[package]] +name = "parquet" +version = "52.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29c3b5322cc1bbf67f11c079c42be41a55949099b78732f7dba9e15edde40eab" +dependencies = [ + "ahash", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-ipc", + "arrow-schema", + "arrow-select", + "base64 0.22.1", + "brotli", + "bytes", + "chrono", + "flate2", + "futures", + "half", + "hashbrown 0.14.5", + "lz4_flex", + "num", + "num-bigint", + "object_store", + "paste", + "seq-macro", + "snap", + "thrift", + "tokio", + "twox-hash", + "zstd", + "zstd-sys", +] + +[[package]] +name = "parse-zoneinfo" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f2a05b18d44e2957b88f96ba460715e295bc1d7510468a2f3d3b44535d26c24" +dependencies = [ + "regex", +] + [[package]] name = "paste" version = "1.0.15" @@ -2377,6 +3363,54 @@ dependencies = [ "ucd-trie", ] +[[package]] +name = "petgraph" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" +dependencies = [ + "fixedbitset", + "indexmap 2.2.6", +] + +[[package]] +name = "phf" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_codegen" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8d39688d359e6b34654d328e262234662d16cc0f60ec8dcbe5e718709342a5a" +dependencies = [ + "phf_generator", + "phf_shared", +] + +[[package]] +name = "phf_generator" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48e4cc64c2ad9ebe670cb8fd69dd50ae301650392e81c05f9bfcb2d5bdbc24b0" +dependencies = [ + "phf_shared", + "rand", +] + +[[package]] +name = "phf_shared" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b" +dependencies = [ + "siphasher", +] + [[package]] name = "pin-project" version = "1.1.5" @@ -2394,7 +3428,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -2516,7 +3550,7 @@ dependencies = [ "itertools 0.12.1", "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -2601,7 +3635,7 @@ name = "recursion_limit_macro" version = "0.1.0" dependencies = [ "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -2610,7 +3644,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c82cf8cff14456045f55ec4241383baeff27af886adb72ffb2162f99911de0fd" dependencies = [ - "bitflags 2.5.0", + "bitflags 2.6.0", ] [[package]] @@ -2630,7 +3664,7 @@ checksum = "bcc303e793d3734489387d205e9b186fac9c6cfacedd98cbb2e8a5943595f3e6" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -2786,7 +3820,7 @@ version = "0.38.34" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" dependencies = [ - "bitflags 2.5.0", + "bitflags 2.6.0", "errno", "libc", "linux-raw-sys", @@ -2920,7 +3954,7 @@ dependencies = [ "proc-macro2", "quote", "serde_derive_internals", - "syn", + "syn 2.0.68", ] [[package]] @@ -2935,7 +3969,7 @@ version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c627723fd09706bacdb5cf41499e95098555af3c3c29d014dc3c458ef6be11c0" dependencies = [ - "bitflags 2.5.0", + "bitflags 2.6.0", "core-foundation", "core-foundation-sys", "libc", @@ -2958,6 +3992,12 @@ version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" +[[package]] +name = "seq-macro" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" + [[package]] name = "serde" version = "1.0.203" @@ -2969,9 +4009,9 @@ dependencies = [ [[package]] name = "serde_bytes" -version = "0.11.14" +version = "0.11.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b8497c313fd43ab992087548117643f6fcd935cbf36f176ffda0aacf9591734" +checksum = "387cc504cb06bb40a96c8e04e951fe01854cf6bc921053c954e4a606d9675c6a" dependencies = [ "serde", ] @@ -2984,7 +4024,7 @@ checksum = "500cbc0ebeb6f46627f50f3f5811ccf6bf00643be300b4c3eabc0ef55dc5b5ba" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -2995,14 +4035,14 @@ checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] name = "serde_json" -version = "1.0.117" +version = "1.0.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "455182ea6142b14f93f4bc5320a2b31c1f266b66a4a5c858b013302a5d8cbfc3" +checksum = "d947f6b3163d8857ea16c4fa0dd4840d52f3041039a85decd46867eb1abef2e4" dependencies = [ "indexmap 2.2.6", "itoa", @@ -3059,7 +4099,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -3146,6 +4186,12 @@ dependencies = [ "time", ] +[[package]] +name = "siphasher" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" + [[package]] name = "slab" version = "0.4.9" @@ -3170,6 +4216,34 @@ dependencies = [ "serde", ] +[[package]] +name = "snafu" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4de37ad025c587a29e8f3f5605c00f70b98715ef90b9061a815b9e59e9042d6" +dependencies = [ + "doc-comment", + "snafu-derive", +] + +[[package]] +name = "snafu-derive" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990079665f075b699031e9c08fd3ab99be5029b96f3b78dc0709e8f77e4efebf" +dependencies = [ + "heck 0.4.1", + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "snap" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" + [[package]] name = "socket2" version = "0.5.7" @@ -3204,6 +4278,46 @@ dependencies = [ "der", ] +[[package]] +name = "sql" +version = "0.1.0" +dependencies = [ + "async-trait", + "datafusion", + "execute", + "futures", + "hasura-authn-core", + "indexmap 2.2.6", + "metadata-resolve", + "ndc-models", + "open-dds", + "schema", + "serde", + "thiserror", + "tracing-util", +] + +[[package]] +name = "sqlparser" +version = "0.47.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "295e9930cd7a97e58ca2a070541a3ca502b17f5d1fa7157376d0fabd85324f25" +dependencies = [ + "log", + "sqlparser_derive", +] + +[[package]] +name = "sqlparser_derive" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.68", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -3227,6 +4341,9 @@ name = "strum" version = "0.26.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" +dependencies = [ + "strum_macros", +] [[package]] name = "strum_macros" @@ -3234,11 +4351,11 @@ version = "0.26.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" dependencies = [ - "heck", + "heck 0.5.0", "proc-macro2", "quote", "rustversion", - "syn", + "syn 2.0.68", ] [[package]] @@ -3247,6 +4364,17 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "syn" version = "2.0.68" @@ -3321,7 +4449,7 @@ dependencies = [ "glob", "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -3347,7 +4475,7 @@ checksum = "46c3384250002a6d5af4d114f2845d37b57521033f30d5c3f46c4d70e1197533" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -3360,6 +4488,17 @@ dependencies = [ "once_cell", ] +[[package]] +name = "thrift" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" +dependencies = [ + "byteorder", + "integer-encoding", + "ordered-float 2.10.1", +] + [[package]] name = "time" version = "0.3.36" @@ -3391,6 +4530,15 @@ dependencies = [ "time-core", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinytemplate" version = "1.2.1" @@ -3403,9 +4551,9 @@ dependencies = [ [[package]] name = "tinyvec" -version = "1.6.0" +version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50" +checksum = "c55115c6fbe2d2bef26eb09ad74bde02d8255476fc0c7b515ef09fbb35742d82" dependencies = [ "tinyvec_macros", ] @@ -3453,7 +4601,7 @@ checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -3556,7 +4704,7 @@ version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" dependencies = [ - "bitflags 2.5.0", + "bitflags 2.6.0", "bytes", "futures-core", "futures-util", @@ -3607,7 +4755,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -3671,7 +4819,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -3680,6 +4828,16 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "twox-hash" +version = "1.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" +dependencies = [ + "cfg-if", + "static_assertions", +] + [[package]] name = "typed-builder" version = "0.18.2" @@ -3697,7 +4855,7 @@ checksum = "1f718dfaf347dcb5b983bfc87608144b0bad87970aebcbea5ce44d2a30c08e63" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -3748,6 +4906,12 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" +[[package]] +name = "unicode-width" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0336d538f7abc86d282a4189614dfaa90810dfc2c6f6427eaf88e16311dd225d" + [[package]] name = "unreachable" version = "1.0.0" @@ -3788,9 +4952,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.8.0" +version = "1.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" +checksum = "5de17fd2f7da591098415cff336e12965a28061ddace43b59cb3c430179c9439" dependencies = [ "getrandom", "serde", @@ -3866,7 +5030,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn", + "syn 2.0.68", "wasm-bindgen-shared", ] @@ -3900,7 +5064,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.68", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -4119,6 +5283,15 @@ dependencies = [ "tap", ] +[[package]] +name = "xz2" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "388c44dc09d76f1536602ead6d325eb532f5c122f17782bd57fb47baeeb767e2" +dependencies = [ + "lzma-sys", +] + [[package]] name = "yansi" version = "0.5.1" @@ -4157,7 +5330,7 @@ checksum = "15e934569e47891f7d9411f1a451d947a60e000ab3bd24fbb970f000387d1b3b" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.68", ] [[package]] @@ -4177,5 +5350,33 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.68", +] + +[[package]] +name = "zstd" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bffb3309596d527cfcba7dfc6ed6052f1d39dfbd7c867aa2e865e4a449c10110" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43747c7422e2924c11144d5229878b98180ef8b06cca4ab5af37afc8a8d8ea3e" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.9+zstd.1.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e16efa8a874a0481a574084d34cc26fdb3b99627480f785888deb6386506656" +dependencies = [ + "cc", + "pkg-config", ] diff --git a/v3/Cargo.toml b/v3/Cargo.toml index c8bdba20753..56fb63b7c23 100644 --- a/v3/Cargo.toml +++ b/v3/Cargo.toml @@ -16,6 +16,7 @@ members = [ "crates/open-dds", "crates/query-usage-analytics", "crates/schema", + "crates/sql", "crates/utils/*", ] diff --git a/v3/crates/engine/Cargo.toml b/v3/crates/engine/Cargo.toml index 48a01dc6d6a..28a06db07e5 100644 --- a/v3/crates/engine/Cargo.toml +++ b/v3/crates/engine/Cargo.toml @@ -27,13 +27,13 @@ lang-graphql = { path = "../lang-graphql" } open-dds = { path = "../open-dds" } opendds-derive = { path = "../utils/opendds-derive" } schema = { path = "../schema" } +sql = { path = "../sql" } tracing-util = { path = "../utils/tracing-util" } metadata-resolve = {path = "../metadata-resolve" } anyhow = { workspace = true } axum = { workspace = true } base64 = { workspace = true } -bincode = { workspace = true } clap = { workspace = true, features = ["derive", "env"] } json_value_merge = { workspace = true } reqwest = { workspace = true, features = ["json", "multipart"] } diff --git a/v3/crates/engine/bin/engine/main.rs b/v3/crates/engine/bin/engine/main.rs index 6fbae997eb8..ed40d95eec1 100644 --- a/v3/crates/engine/bin/engine/main.rs +++ b/v3/crates/engine/bin/engine/main.rs @@ -13,6 +13,7 @@ use axum::{ Extension, Json, Router, }; use clap::Parser; +use reqwest::header::CONTENT_TYPE; use tower_http::cors::CorsLayer; use tower_http::trace::TraceLayer; use tracing_util::{ @@ -61,6 +62,9 @@ struct ServerOptions { /// The port on which the server listens. #[arg(long, value_name = "PORT", env = "PORT", default_value_t = DEFAULT_PORT)] port: u16, + /// Enables the '/v1/sql' endpoint + #[arg(long, env = "ENABLE_SQL_INTERFACE")] + enable_sql_interface: bool, /// Enable CORS. Support preflight request and include related headers in responses. #[arg(long, env = "ENABLE_CORS")] enable_cors: bool, @@ -88,6 +92,7 @@ struct EngineState { http_context: HttpContext, schema: gql::schema::Schema, auth_config: AuthConfig, + sql_context: sql::catalog::Context, } #[tokio::main] @@ -156,7 +161,7 @@ async fn shutdown_signal() { enum StartupError { #[error("could not read the auth config - {0}")] ReadAuth(anyhow::Error), - #[error("could not read the schema - {0}")] + #[error("failed to build engine state - {0}")] ReadSchema(anyhow::Error), } @@ -174,6 +179,8 @@ struct EngineRouter { /// The metadata routes for the introspection metadata file. /// Contains /metadata and /metadata-hash routes. metadata_routes: Option, + /// Routes for the SQL interface + sql_routes: Option, /// The CORS layer for the engine. cors_layer: Option, } @@ -233,6 +240,7 @@ impl EngineRouter { Self { base_router: base_routes, metadata_routes: None, + sql_routes: None, cors_layer: None, } } @@ -257,6 +265,25 @@ impl EngineRouter { Ok(()) } + fn add_sql_route(&mut self, state: Arc) { + let sql_routes = Router::new() + .route("/v1/sql", post(handle_sql_request)) + .layer(axum::middleware::from_fn( + hasura_authn_core::resolve_session, + )) + .layer(axum::middleware::from_fn_with_state( + state.clone(), + authentication_middleware, + )) + .layer(axum::middleware::from_fn(sql_request_tracing_middleware)) + // *PLEASE DO NOT ADD ANY MIDDLEWARE + // BEFORE THE `explain_request_tracing_middleware`* + // Refer to it for more details. + .layer(TraceLayer::new_for_http()) + .with_state(state); + self.sql_routes = Some(sql_routes); + } + fn add_cors_layer(&mut self, allow_origin: &[String]) { self.cors_layer = Some(cors::build_cors_layer(allow_origin)); } @@ -264,6 +291,10 @@ impl EngineRouter { fn into_make_service(self) -> axum::routing::IntoMakeService { let mut app = self.base_router; // Merge the metadata routes if they exist. + if let Some(sql_routes) = self.sql_routes { + app = app.merge(sql_routes); + } + // Merge the metadata routes if they exist. if let Some(metadata_routes) = self.metadata_routes { app = app.merge(metadata_routes); } @@ -279,25 +310,20 @@ impl EngineRouter { #[allow(clippy::print_stdout)] async fn start_engine(server: &ServerOptions) -> Result<(), StartupError> { - let auth_config = - read_auth_config(&server.authn_config_path).map_err(StartupError::ReadAuth)?; - let metadata_resolve_flags = resolve_unstable_features(&server.unstable_features); - let schema = read_schema(&server.metadata_path, &metadata_resolve_flags) - .map_err(StartupError::ReadSchema)?; + let state = build_state( + &server.authn_config_path, + &server.metadata_path, + &metadata_resolve_flags, + ) + .map_err(StartupError::ReadSchema)?; - let http_context = HttpContext { - client: reqwest::Client::new(), - ndc_response_size_limit: None, - }; - let state = Arc::new(EngineState { - http_context, - schema, - auth_config, - }); + let mut engine_router = EngineRouter::new(state.clone()); - let mut engine_router = EngineRouter::new(state); + if server.enable_sql_interface { + engine_router.add_sql_route(state.clone()); + } // If `--introspection-metadata` is specified we also serve the file indicated on `/metadata` // and its hash on `/metadata-hash`. @@ -390,6 +416,33 @@ async fn explain_request_tracing_middleware( .response } +/// Middleware to start tracing of the `/v1/sql` request. +/// This middleware must be active for the entire duration +/// of the request i.e. this middleware should be the +/// entry point and the exit point of the SQL request. +async fn sql_request_tracing_middleware( + request: Request, + next: Next, +) -> axum::response::Response { + let tracer = tracing_util::global_tracer(); + let path = "/v1/sql"; + tracer + .in_span_async_with_parent_context( + path, + path, + SpanVisibility::User, + &request.headers().clone(), + || { + Box::pin(async move { + let response = next.run(request).await; + TraceableHttpResponse::new(response, path) + }) + }, + ) + .await + .response +} + #[derive(Debug, thiserror::Error)] enum AuthError { #[error("JWT auth error: {0}")] @@ -540,16 +593,78 @@ async fn handle_explain_request( response } -fn read_schema( +/// Handle a SQL request and execute it. +async fn handle_sql_request( + State(state): State>, + Extension(session): Extension, + Json(request): Json, +) -> axum::response::Response { + let tracer = tracing_util::global_tracer(); + let response = tracer + .in_span_async( + "handle_sql_request", + "Handle SQL Request", + SpanVisibility::User, + || { + Box::pin(async { + sql::execute::execute_sql( + &state.sql_context, + Arc::new(session), + Arc::new(state.http_context.clone()), + &request, + ) + .await + }) + }, + ) + .await; + + // Set the span as error if the response contains an error + set_status_on_current_span(&response); + + match response { + Ok(r) => { + let mut response = (axum::http::StatusCode::OK, r).into_response(); + response.headers_mut().insert( + CONTENT_TYPE, + axum::http::HeaderValue::from_static("application/json"), + ); + response + } + Err(e) => ( + axum::http::StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": e.to_string()})), + ) + .into_response(), + } +} + +/// Build the engine state - include auth, metadata, and sql context. +fn build_state( + authn_config_path: &PathBuf, metadata_path: &PathBuf, metadata_resolve_flags: &metadata_resolve::MetadataResolveFlagsInternal, -) -> Result, anyhow::Error> { +) -> Result, anyhow::Error> { + let auth_config = read_auth_config(authn_config_path).map_err(StartupError::ReadAuth)?; let raw_metadata = std::fs::read_to_string(metadata_path)?; let metadata = open_dds::Metadata::from_json_str(&raw_metadata)?; - Ok(engine::build::build_schema( - metadata, - metadata_resolve_flags, - )?) + let resolved_metadata = metadata_resolve::resolve(metadata, metadata_resolve_flags)?; + let http_context = HttpContext { + client: reqwest::Client::new(), + ndc_response_size_limit: None, + }; + let sql_context = sql::catalog::Context::from_metadata(&resolved_metadata); + let schema = schema::GDS { + metadata: resolved_metadata, + } + .build_schema()?; + let state = Arc::new(EngineState { + http_context, + schema, + auth_config, + sql_context, + }); + Ok(state) } fn read_auth_config(path: &PathBuf) -> Result { diff --git a/v3/crates/engine/src/build.rs b/v3/crates/engine/src/build.rs index 3bb43033079..333999ecb9c 100644 --- a/v3/crates/engine/src/build.rs +++ b/v3/crates/engine/src/build.rs @@ -10,14 +10,15 @@ pub enum BuildError { InvalidMetadata(#[from] metadata_resolve::Error), #[error("unable to build schema: {0}")] UnableToBuildSchema(#[from] schema::Error), - #[error("unable to encode schema: {0}")] - EncodingError(#[from] bincode::Error), } pub fn build_schema( metadata: open_dds::Metadata, metadata_resolve_flags: &metadata_resolve::MetadataResolveFlagsInternal, ) -> Result, BuildError> { - let gds = schema::GDS::new(metadata, metadata_resolve_flags)?; + let resolved_metadata = metadata_resolve::resolve(metadata, metadata_resolve_flags)?; + let gds = schema::GDS { + metadata: resolved_metadata, + }; Ok(gds.build_schema()?) } diff --git a/v3/crates/execute/src/ir/permissions.rs b/v3/crates/execute/src/ir/permissions.rs index f4877494277..41ea665a81c 100644 --- a/v3/crates/execute/src/ir/permissions.rs +++ b/v3/crates/execute/src/ir/permissions.rs @@ -75,7 +75,7 @@ pub(crate) fn get_argument_presets( } } -pub(crate) fn process_model_predicate<'s>( +pub fn process_model_predicate<'s>( model_predicate: &'s metadata_resolve::ModelPredicate, session_variables: &SessionVariables, relationships: &mut BTreeMap>, diff --git a/v3/crates/execute/src/ir/relationship.rs b/v3/crates/execute/src/ir/relationship.rs index 7892e704a23..831f3ec2e20 100644 --- a/v3/crates/execute/src/ir/relationship.rs +++ b/v3/crates/execute/src/ir/relationship.rs @@ -31,7 +31,7 @@ use schema::{Annotation, BooleanExpressionAnnotation, InputAnnotation, ModelInpu use schema::{CommandRelationshipAnnotation, CommandTargetSource}; #[derive(Debug, Serialize)] -pub(crate) struct LocalModelRelationshipInfo<'s> { +pub struct LocalModelRelationshipInfo<'s> { pub relationship_name: &'s RelationshipName, pub relationship_type: &'s RelationshipType, pub source_type: &'s Qualified, diff --git a/v3/crates/execute/src/lib.rs b/v3/crates/execute/src/lib.rs index 12b91dbc3f2..66a56d0935f 100644 --- a/v3/crates/execute/src/lib.rs +++ b/v3/crates/execute/src/lib.rs @@ -1,13 +1,14 @@ mod error; mod explain; mod global_id; -mod ir; -mod model_tracking; -mod ndc; +pub mod ir; +pub mod model_tracking; +pub mod ndc; mod plan; mod process_response; mod remote_joins; +pub use plan::process_model_relationship_definition; use plan::ExecuteQueryResult; use thiserror::Error; @@ -28,9 +29,11 @@ use tracing_util::{ // we explicitly export things used by other crates pub use explain::execute_explain; pub use explain::types::{redact_ndc_explain, ExplainResponse}; +pub use ndc::fetch_from_data_connector; pub use plan::{execute_mutation_plan, execute_query_plan, generate_request_plan, RequestPlan}; /// Context for making HTTP requests +#[derive(Debug, Clone)] pub struct HttpContext { /// The HTTP client to use for making requests pub client: reqwest::Client, diff --git a/v3/crates/execute/src/ndc.rs b/v3/crates/execute/src/ndc.rs index 8c59cf4db3a..daf7364ab43 100644 --- a/v3/crates/execute/src/ndc.rs +++ b/v3/crates/execute/src/ndc.rs @@ -56,7 +56,7 @@ pub async fn execute_ndc_query<'n, 's>( .await } -pub(crate) async fn fetch_from_data_connector<'s>( +pub async fn fetch_from_data_connector<'s>( http_context: &HttpContext, query_request: &ndc_models::QueryRequest, data_connector: &metadata_resolve::DataConnectorLink, diff --git a/v3/crates/execute/src/plan.rs b/v3/crates/execute/src/plan.rs index e32bc48f330..87604a711e1 100644 --- a/v3/crates/execute/src/plan.rs +++ b/v3/crates/execute/src/plan.rs @@ -4,6 +4,8 @@ mod model_selection; mod relationships; pub(crate) mod selection_set; +pub use relationships::process_model_relationship_definition; + use gql::normalized_ast; use gql::schema::NamespacedGetter; use hasura_authn_core::Role; diff --git a/v3/crates/execute/src/plan/relationships.rs b/v3/crates/execute/src/plan/relationships.rs index b21ae9b497f..2ab297a918f 100644 --- a/v3/crates/execute/src/plan/relationships.rs +++ b/v3/crates/execute/src/plan/relationships.rs @@ -73,7 +73,7 @@ pub(crate) fn collect_relationships( Ok(()) } -pub(crate) fn process_model_relationship_definition( +pub fn process_model_relationship_definition( relationship_info: &LocalModelRelationshipInfo, ) -> Result { let &LocalModelRelationshipInfo { diff --git a/v3/crates/execute/tests/generate_ir/get_many_model_count/expected.json b/v3/crates/execute/tests/generate_ir/get_many_model_count/expected.json index 8ac675e88f3..91c3d20c49a 100644 --- a/v3/crates/execute/tests/generate_ir/get_many_model_count/expected.json +++ b/v3/crates/execute/tests/generate_ir/get_many_model_count/expected.json @@ -57,6 +57,7 @@ "type": "named", "name": "int4" }, + "column_type_representation": null, "argument_mappings": {} }, "author_id": { @@ -65,6 +66,7 @@ "type": "named", "name": "int4" }, + "column_type_representation": null, "argument_mappings": {} }, "title": { @@ -73,6 +75,7 @@ "type": "named", "name": "varchar" }, + "column_type_representation": null, "argument_mappings": {} } } @@ -177,6 +180,7 @@ "type": "named", "name": "int8" }, + "column_type_representation": null, "argument_mappings": {} }, "first_name": { @@ -185,6 +189,7 @@ "type": "named", "name": "varchar" }, + "column_type_representation": null, "argument_mappings": {} }, "last_name": { @@ -193,6 +198,7 @@ "type": "named", "name": "varchar" }, + "column_type_representation": null, "argument_mappings": {} } } @@ -297,6 +303,7 @@ "type": "named", "name": "int4" }, + "column_type_representation": null, "argument_mappings": {} }, "author_id": { @@ -305,6 +312,7 @@ "type": "named", "name": "int4" }, + "column_type_representation": null, "argument_mappings": {} }, "title": { @@ -313,6 +321,7 @@ "type": "named", "name": "varchar" }, + "column_type_representation": null, "argument_mappings": {} } } @@ -656,6 +665,7 @@ "type": "named", "name": "int8" }, + "column_type_representation": null, "argument_mappings": {} }, "first_name": { @@ -664,6 +674,7 @@ "type": "named", "name": "varchar" }, + "column_type_representation": null, "argument_mappings": {} }, "last_name": { @@ -672,6 +683,7 @@ "type": "named", "name": "varchar" }, + "column_type_representation": null, "argument_mappings": {} } } @@ -711,6 +723,7 @@ "type": "named", "name": "int4" }, + "column_type_representation": null, "argument_mappings": {} }, "author_id": { @@ -719,6 +732,7 @@ "type": "named", "name": "int4" }, + "column_type_representation": null, "argument_mappings": {} }, "title": { @@ -727,6 +741,7 @@ "type": "named", "name": "varchar" }, + "column_type_representation": null, "argument_mappings": {} } } @@ -803,6 +818,7 @@ "type": "named", "name": "int4" }, + "column_type_representation": null, "argument_mappings": {} }, "author_id": { @@ -811,6 +827,7 @@ "type": "named", "name": "int4" }, + "column_type_representation": null, "argument_mappings": {} }, "title": { @@ -819,6 +836,7 @@ "type": "named", "name": "varchar" }, + "column_type_representation": null, "argument_mappings": {} } } @@ -858,6 +876,7 @@ "type": "named", "name": "int8" }, + "column_type_representation": null, "argument_mappings": {} }, "first_name": { @@ -866,6 +885,7 @@ "type": "named", "name": "varchar" }, + "column_type_representation": null, "argument_mappings": {} }, "last_name": { @@ -874,6 +894,7 @@ "type": "named", "name": "varchar" }, + "column_type_representation": null, "argument_mappings": {} } } @@ -950,6 +971,7 @@ "type": "named", "name": "int4" }, + "column_type_representation": null, "argument_mappings": {} }, "author_id": { @@ -958,6 +980,7 @@ "type": "named", "name": "int4" }, + "column_type_representation": null, "argument_mappings": {} }, "title": { @@ -966,6 +989,7 @@ "type": "named", "name": "varchar" }, + "column_type_representation": null, "argument_mappings": {} } } @@ -1005,6 +1029,7 @@ "type": "named", "name": "int4" }, + "column_type_representation": null, "argument_mappings": {} }, "author_id": { @@ -1013,6 +1038,7 @@ "type": "named", "name": "int4" }, + "column_type_representation": null, "argument_mappings": {} }, "title": { @@ -1021,6 +1047,7 @@ "type": "named", "name": "varchar" }, + "column_type_representation": null, "argument_mappings": {} } } diff --git a/v3/crates/metadata-resolve/src/lib.rs b/v3/crates/metadata-resolve/src/lib.rs index 00a8b5ecf95..bbd4b6475c0 100644 --- a/v3/crates/metadata-resolve/src/lib.rs +++ b/v3/crates/metadata-resolve/src/lib.rs @@ -31,7 +31,7 @@ pub use stages::commands::Command; pub use stages::data_connectors; pub use stages::data_connectors::DataConnectorLink; pub use stages::model_permissions::{ - FilterPermission, ModelPredicate, ModelTargetSource, ModelWithPermissions, + FilterPermission, ModelPredicate, ModelTargetSource, ModelWithPermissions, SelectPermission, }; pub use stages::models::{ConnectorArgumentName, Model, ModelSource}; diff --git a/v3/crates/metadata-resolve/src/stages/object_types/mod.rs b/v3/crates/metadata-resolve/src/stages/object_types/mod.rs index 5770e0e98ac..e016e9b249e 100644 --- a/v3/crates/metadata-resolve/src/stages/object_types/mod.rs +++ b/v3/crates/metadata-resolve/src/stages/object_types/mod.rs @@ -9,6 +9,7 @@ pub use types::{ ResolvedApolloFederationObjectKey, ResolvedObjectApolloFederationConfig, TypeMapping, }; +use crate::helpers::ndc_validation::get_underlying_named_type; use crate::helpers::types::{mk_name, store_new_graphql_type}; use crate::stages::data_connectors; @@ -328,9 +329,17 @@ pub fn resolve_data_connector_type_mapping( ) }; let source_column = get_column(ndc_object_type, field_name, resolved_field_mapping_column)?; + let underlying_column_type = get_underlying_named_type(&source_column.r#type); + let column_type_representation = data_connector_context + .inner + .schema + .scalar_types + .get(underlying_column_type) + .and_then(|scalar_type| scalar_type.representation.clone()); let resolved_field_mapping = FieldMapping { column: resolved_field_mapping_column.clone(), column_type: source_column.r#type.clone(), + column_type_representation, argument_mappings: resolved_argument_mappings.0, }; diff --git a/v3/crates/metadata-resolve/src/stages/object_types/types.rs b/v3/crates/metadata-resolve/src/stages/object_types/types.rs index d6a4b1be370..882ebee302a 100644 --- a/v3/crates/metadata-resolve/src/stages/object_types/types.rs +++ b/v3/crates/metadata-resolve/src/stages/object_types/types.rs @@ -129,6 +129,7 @@ pub struct ResolvedApolloFederationObjectKey { pub struct FieldMapping { pub column: DataConnectorColumnName, pub column_type: ndc_models::Type, + pub column_type_representation: Option, pub argument_mappings: BTreeMap, } diff --git a/v3/crates/metadata-resolve/src/types/error.rs b/v3/crates/metadata-resolve/src/types/error.rs index 0686a7565b1..1dfbaed1902 100644 --- a/v3/crates/metadata-resolve/src/types/error.rs +++ b/v3/crates/metadata-resolve/src/types/error.rs @@ -989,7 +989,7 @@ pub enum TypeMappingValidationError { unknown_ndc_field_type_name: String, }, #[error("ndc validation error: {0}")] - NDCValidationError(NDCValidationError), + NDCValidationError(#[from] NDCValidationError), } impl From for Error { diff --git a/v3/crates/sql/Cargo.toml b/v3/crates/sql/Cargo.toml new file mode 100644 index 00000000000..a75980f2ffb --- /dev/null +++ b/v3/crates/sql/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "sql" +version.workspace = true +edition.workspace = true +license.workspace = true + +[dependencies] +metadata-resolve = {path = "../metadata-resolve" } +open-dds = { path = "../open-dds" } +schema = { path = "../schema" } +execute = { path = "../execute" } +tracing-util = { path = "../utils/tracing-util" } +hasura-authn-core = { path = "../auth/hasura-authn-core" } + +ndc-models = { workspace = true } +indexmap = { workspace = true } +datafusion = { version = "39.0.0", features = ["serde"] } +async-trait = "0.1.80" +futures = "0.3.30" +serde = { workspace = true, features = ["rc"] } +thiserror = { workspace = true } + +[lints] +workspace = true diff --git a/v3/crates/sql/readme.md b/v3/crates/sql/readme.md new file mode 100644 index 00000000000..0421a41b5cd --- /dev/null +++ b/v3/crates/sql/readme.md @@ -0,0 +1,12 @@ +# SQL Interface + +An experimental SQL interface over OpenDD models. This is mostly targeted at AI +use cases for now - GenAI models are better at generating SQL queries than +GraphQL queries. + +This is implemented using the Apache DataFusion Query Engine by deriving the SQL +metadata for datafusion from Open DDS metadata. As the implementation currently +stands, once we get a `LogicalPlan` from datafusion we replace `TableScan`s with +NDC queries to the underlying connector. There is a rudimentary optimizer that +pushes down projections to the ndc query so that we don't fetch all the columns +of a collection. diff --git a/v3/crates/sql/src/catalog.rs b/v3/crates/sql/src/catalog.rs new file mode 100644 index 00000000000..1b7941003e8 --- /dev/null +++ b/v3/crates/sql/src/catalog.rs @@ -0,0 +1,259 @@ +use std::{any::Any, collections::HashMap, sync::Arc}; + +use ::datafusion::{ + execution::{context::SessionState, runtime_env::RuntimeEnv}, + sql::TableReference, +}; +use async_trait::async_trait; +use hasura_authn_core::Session; +use indexmap::IndexMap; +use metadata_resolve::{self as resolved}; +use open_dds::permissions::Role; +use schema::OpenDDSchemaProvider; +use serde::{Deserialize, Serialize}; + +mod datafusion { + pub(super) use datafusion::{ + catalog::{schema::SchemaProvider, CatalogProvider}, + datasource::TableProvider, + error::Result, + prelude::{SessionConfig, SessionContext}, + scalar::ScalarValue, + }; +} + +pub mod introspection; +pub mod schema; +pub mod table; + +/// The context in which to compile and execute SQL queries. +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct Context { + pub(crate) subgraphs: IndexMap, + pub(crate) type_permissions: HashMap>, + pub(crate) introspection: introspection::Introspection, +} + +impl Context { + /// Derive a SQL Context from resolved Open DDS metadata. + pub fn from_metadata(metadata: &resolved::Metadata) -> Self { + let mut subgraphs = IndexMap::new(); + for (model_name, model) in &metadata.models { + let schema_name = &model_name.subgraph; + let table_name = &model_name.name; + let subgraph = + subgraphs + .entry(schema_name.clone()) + .or_insert_with(|| schema::Subgraph { + models: IndexMap::new(), + }); + subgraph.models.insert( + table_name.to_string(), + table::Model::from_resolved_model(model), + ); + } + + let mut type_permissions = HashMap::new(); + for (type_name, object_type) in &metadata.object_types { + for (role, output_permission) in &object_type.type_output_permissions { + let output_permission = table::TypePermission { + output: output_permission.clone(), + }; + let role_permissions = + type_permissions + .entry(role) + .or_insert_with(|| table::TypePermissionsOfRole { + permissions: HashMap::new(), + }); + role_permissions + .permissions + .insert(type_name.clone(), output_permission); + } + } + let introspection = introspection::Introspection::from_metadata(metadata, &subgraphs); + Context { + subgraphs, + type_permissions: type_permissions + .into_iter() + .map(|(role, role_permissions)| (role.clone(), Arc::new(role_permissions))) + .collect(), + introspection, + } + } +} + +pub struct OpenDDCatalogProvider { + schemas: IndexMap>, +} + +impl OpenDDCatalogProvider { + fn new( + session: &Arc, + http_context: &Arc, + context: &Context, + ) -> Self { + let type_permissions = context.type_permissions.get(&session.role).cloned(); + let mut schemas = IndexMap::new(); + for (subgraph_name, subgraph) in &context.subgraphs { + let mut tables = IndexMap::new(); + for model in subgraph.models.values() { + let select_permission = model.permissions.get(&session.role).cloned(); + let provider = table::OpenDDTableProvider { + session: session.clone(), + http_context: http_context.clone(), + name: model.name.clone(), + data_type: model.data_type.clone(), + source: model.source.clone(), + schema: model.schema.clone(), + select_permission, + type_permissions: type_permissions.clone(), + }; + tables.insert(model.name.to_string(), Arc::new(provider)); + } + let provider = HasuraSchemaProvider::OpenDD(schema::OpenDDSchemaProvider { tables }); + schemas.insert(subgraph_name.clone(), Arc::new(provider)); + } + schemas.insert( + introspection::HASURA_METADATA_SCHEMA.to_string(), + Arc::new(HasuraSchemaProvider::Introspection( + introspection::IntrospectionSchemaProvider::new(&context.introspection), + )), + ); + OpenDDCatalogProvider { schemas } + } + pub(crate) fn get( + &self, + default_schema: Option<&str>, + table: &TableReference, + ) -> Option<&table::OpenDDTableProvider> { + let schema = table.schema().or(default_schema); + let table = table.table(); + if let Some(schema) = schema { + if let HasuraSchemaProvider::OpenDD(schema) = self.schemas.get(schema)?.as_ref() { + schema.tables.get(table).map(std::convert::AsRef::as_ref) + } else { + None + } + } else { + None + } + } +} + +enum HasuraSchemaProvider { + OpenDD(OpenDDSchemaProvider), + Introspection(introspection::IntrospectionSchemaProvider), +} + +#[async_trait] +impl datafusion::SchemaProvider for HasuraSchemaProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + match self { + HasuraSchemaProvider::OpenDD(schema) => schema.table_names(), + HasuraSchemaProvider::Introspection(schema) => schema.table_names(), + } + } + + async fn table( + &self, + name: &str, + ) -> datafusion::Result>> { + match self { + HasuraSchemaProvider::OpenDD(schema) => schema.table(name).await, + HasuraSchemaProvider::Introspection(schema) => schema.table(name).await, + } + } + + fn table_exist(&self, name: &str) -> bool { + match self { + HasuraSchemaProvider::OpenDD(schema) => schema.table_exist(name), + HasuraSchemaProvider::Introspection(schema) => schema.table_exist(name), + } + } +} + +impl datafusion::CatalogProvider for OpenDDCatalogProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> Vec { + self.schemas.keys().cloned().collect() + } + + fn schema(&self, name: &str) -> Option> { + self.schemas + .get(name) + .cloned() + .map(|schema| schema as Arc) + } +} + +impl Context { + pub fn create_session_context( + &self, + session: &Arc, + http_context: &Arc, + ) -> datafusion::SessionContext { + let default_schema_name = if self.subgraphs.len() == 1 { + self.subgraphs.get_index(0).map(|v| v.0) + } else { + None + }; + let session_config = datafusion::SessionConfig::new() + .set( + "datafusion.catalog.default_catalog", + datafusion::ScalarValue::Utf8(Some("default".to_string())), + ) + .set( + "datafusion.catalog.information_schema", + datafusion::ScalarValue::Boolean(Some(true)), + ) + .set( + "datafusion.execution.target_partitions", + datafusion::ScalarValue::Int32(Some(1)), + ) + .set( + "datafusion.execution.planning_concurrency", + datafusion::ScalarValue::Int32(Some(1)), + ) + .set( + "datafusion.sql_parser.enable_ident_normalization", + datafusion::ScalarValue::Boolean(Some(false)), + ); + + let session_config = if let Some(default_schema_name) = default_schema_name { + session_config.set( + "datafusion.catalog.default_schema", + datafusion::ScalarValue::Utf8(Some(default_schema_name.clone())), + ) + } else { + session_config + }; + let catalog = Arc::new(OpenDDCatalogProvider::new(session, http_context, self)); + let query_planner = Arc::new(super::execute::planner::NDCQueryPlanner { + default_schema: default_schema_name.map(|s| Arc::new(s.clone())), + catalog: catalog.clone(), + }); + let session_state = + SessionState::new_with_config_rt(session_config, Arc::new(RuntimeEnv::default())) + .with_analyzer_rules(vec![Arc::new( + super::execute::analyzer::ReplaceTableScan::new( + default_schema_name.map(|s| Arc::new(s.clone())), + catalog.clone(), + ), + )]) + .with_query_planner(query_planner) + .add_optimizer_rule(Arc::new( + super::execute::optimizer::NDCPushDownProjection {}, + )); + let session_context = datafusion::SessionContext::new_with_state(session_state); + session_context + .register_catalog("default", catalog as Arc); + session_context + } +} diff --git a/v3/crates/sql/src/catalog/introspection.rs b/v3/crates/sql/src/catalog/introspection.rs new file mode 100644 index 00000000000..0973a1ec864 --- /dev/null +++ b/v3/crates/sql/src/catalog/introspection.rs @@ -0,0 +1,354 @@ +//! Describe and populate the introspection tables used by data fusion. + +use std::{any::Any, sync::Arc}; + +use async_trait::async_trait; +use indexmap::IndexMap; +use metadata_resolve::{self as resolved, ModelRelationshipTarget}; +mod df { + pub(super) use datafusion::{ + arrow::{ + array::RecordBatch, + datatypes::{DataType, Field, Schema, SchemaRef}, + }, + catalog::schema::SchemaProvider, + common::ScalarValue, + datasource::{TableProvider, TableType}, + error::Result, + execution::context::SessionState, + logical_expr::Expr, + physical_plan::{values::ValuesExec, ExecutionPlan}, + }; +} +use open_dds::relationships::RelationshipType; +use serde::{Deserialize, Serialize}; + +pub const HASURA_METADATA_SCHEMA: &str = "hasura"; +pub const TABLE_METADATA: &str = "table_metadata"; +pub const COLUMN_METADATA: &str = "column_metadata"; +pub const INFERRED_FOREIGN_KEY_CONSTRAINTS: &str = "inferred_foreign_key_constraints"; + +/// Describes the database schema structure and metadata. +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub(crate) struct Introspection { + pub(crate) table_metadata: TableMetadata, + pub(crate) column_metadata: ColumnMetadata, + pub(crate) inferred_foreign_key_constraints: InferredForeignKeys, +} + +impl Introspection { + /// Derive SQL schema from the Open DDS metadata. + pub fn from_metadata( + metadata: &resolved::Metadata, + schemas: &IndexMap, + ) -> Self { + let mut table_metadata_rows = Vec::new(); + let mut column_metadata_rows = Vec::new(); + let mut foreign_key_constraint_rows = Vec::new(); + for (schema_name, schema) in schemas { + for (table_name, table) in &schema.models { + table_metadata_rows.push(TableRow::new( + schema_name.clone(), + table_name.to_string(), + table.description.clone(), + )); + for (column_name, column_description) in &table.columns { + column_metadata_rows.push(ColumnRow { + schema_name: schema_name.clone(), + table_name: table_name.clone(), + column_name: column_name.clone(), + description: column_description.clone(), + }); + } + + // TODO: + // 1. Need to check if the target_model is part of subgraphs + // 2. Need to also check for array relationships in case the corresponding + // object relationship isn't present + if let Some(object_type) = metadata.object_types.get(&table.data_type) { + for relationship in object_type.relationship_fields.values() { + if let metadata_resolve::RelationshipTarget::Model( + ModelRelationshipTarget { + model_name, + relationship_type: RelationshipType::Object, + target_typename: _, + mappings, + }, + ) = &relationship.target + { + for mapping in mappings { + foreign_key_constraint_rows.push(ForeignKeyRow { + from_schema_name: schema_name.clone(), + from_table_name: table_name.clone(), + from_column_name: mapping.source_field.field_name.to_string(), + to_schema_name: model_name.subgraph.clone(), + to_table_name: model_name.name.to_string(), + to_column_name: mapping.target_field.field_name.to_string(), + }); + } + } + } + } + } + } + Introspection { + table_metadata: TableMetadata::new(table_metadata_rows), + column_metadata: ColumnMetadata::new(column_metadata_rows), + inferred_foreign_key_constraints: InferredForeignKeys::new(foreign_key_constraint_rows), + } + } +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub(crate) struct TableMetadata { + schema: df::SchemaRef, + rows: Vec, +} + +impl TableMetadata { + pub(crate) fn new(rows: Vec) -> Self { + let schema_name = df::Field::new("schema_name", df::DataType::Utf8, false); + let table_name = df::Field::new("table_name", df::DataType::Utf8, false); + let description = df::Field::new("description", df::DataType::Utf8, true); + let schema = + df::SchemaRef::new(df::Schema::new(vec![schema_name, table_name, description])); + TableMetadata { schema, rows } + } +} + +impl TableMetadata { + fn to_values_table(&self) -> ValuesTable { + ValuesTable { + schema: self.schema.clone(), + rows: self + .rows + .iter() + .map(|row| { + vec![ + df::ScalarValue::Utf8(Some(row.schema_name.clone())), + df::ScalarValue::Utf8(Some(row.table_name.clone())), + df::ScalarValue::Utf8(row.description.clone()), + ] + }) + .collect(), + } + } +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub(crate) struct TableRow { + schema_name: String, + table_name: String, + description: Option, +} + +impl TableRow { + pub(crate) fn new( + schema_name: String, + table_name: String, + description: Option, + ) -> Self { + Self { + schema_name, + table_name, + description, + } + } +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub(crate) struct ColumnMetadata { + pub(crate) schema: df::SchemaRef, + pub(crate) rows: Vec, +} + +impl ColumnMetadata { + fn new(rows: Vec) -> Self { + let schema_name = df::Field::new("schema_name", df::DataType::Utf8, false); + let table_name = df::Field::new("table_name", df::DataType::Utf8, false); + let column_name = df::Field::new("column_name", df::DataType::Utf8, false); + let description = df::Field::new("description", df::DataType::Utf8, true); + let schema = df::SchemaRef::new(df::Schema::new(vec![ + schema_name, + table_name, + column_name, + description, + ])); + ColumnMetadata { schema, rows } + } + fn to_values_table(&self) -> ValuesTable { + ValuesTable { + schema: self.schema.clone(), + rows: self + .rows + .iter() + .map(|row| { + vec![ + df::ScalarValue::Utf8(Some(row.schema_name.clone())), + df::ScalarValue::Utf8(Some(row.table_name.clone())), + df::ScalarValue::Utf8(Some(row.column_name.clone())), + df::ScalarValue::Utf8(row.description.clone()), + ] + }) + .collect(), + } + } +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub(crate) struct ColumnRow { + schema_name: String, + table_name: String, + column_name: String, + description: Option, +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub(crate) struct InferredForeignKeys { + schema: df::SchemaRef, + rows: Vec, +} + +impl InferredForeignKeys { + fn new(rows: Vec) -> Self { + let from_schema_name = df::Field::new("from_schema_name", df::DataType::Utf8, false); + let from_table_name = df::Field::new("from_table_name", df::DataType::Utf8, false); + let from_column_name = df::Field::new("from_column_name", df::DataType::Utf8, false); + let to_schema_name = df::Field::new("to_schema_name", df::DataType::Utf8, false); + let to_table_name = df::Field::new("to_table_name", df::DataType::Utf8, false); + let to_column_name = df::Field::new("to_column_name", df::DataType::Utf8, false); + let schema = df::SchemaRef::new(df::Schema::new(vec![ + from_schema_name, + from_table_name, + from_column_name, + to_schema_name, + to_table_name, + to_column_name, + ])); + InferredForeignKeys { schema, rows } + } + fn to_values_table(&self) -> ValuesTable { + ValuesTable { + schema: self.schema.clone(), + rows: self + .rows + .iter() + .map(|row| { + vec![ + df::ScalarValue::Utf8(Some(row.from_schema_name.clone())), + df::ScalarValue::Utf8(Some(row.from_table_name.clone())), + df::ScalarValue::Utf8(Some(row.from_column_name.clone())), + df::ScalarValue::Utf8(Some(row.to_schema_name.clone())), + df::ScalarValue::Utf8(Some(row.to_table_name.clone())), + df::ScalarValue::Utf8(Some(row.to_column_name.clone())), + ] + }) + .collect(), + } + } +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +struct ForeignKeyRow { + from_schema_name: String, + from_table_name: String, + from_column_name: String, + to_schema_name: String, + to_table_name: String, + to_column_name: String, +} + +pub(crate) struct IntrospectionSchemaProvider { + tables: IndexMap>, +} + +impl IntrospectionSchemaProvider { + pub(crate) fn new(introspection: &Introspection) -> Self { + let tables = [ + ( + TABLE_METADATA, + introspection.table_metadata.to_values_table(), + ), + ( + COLUMN_METADATA, + introspection.column_metadata.to_values_table(), + ), + ( + INFERRED_FOREIGN_KEY_CONSTRAINTS, + introspection + .inferred_foreign_key_constraints + .to_values_table(), + ), + ] + .into_iter() + .map(|(k, table)| (k.to_string(), Arc::new(table) as Arc)) + .collect(); + IntrospectionSchemaProvider { tables } + } +} + +#[async_trait] +impl df::SchemaProvider for IntrospectionSchemaProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + self.tables.keys().cloned().collect::>() + } + + async fn table( + &self, + name: &str, + ) -> datafusion::error::Result>> { + Ok(self.tables.get(name).cloned()) + } + + fn table_exist(&self, name: &str) -> bool { + self.tables.contains_key(name) + } +} + +// A table with static rows +struct ValuesTable { + schema: df::SchemaRef, + rows: Vec>, +} + +#[async_trait] +impl df::TableProvider for ValuesTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> df::SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> df::TableType { + df::TableType::View + } + async fn scan( + &self, + _state: &df::SessionState, + projection: Option<&Vec>, + // filters and limit can be used here to inject some push-down operations if needed + _filters: &[df::Expr], + _limit: Option, + ) -> datafusion::error::Result> { + let projected_schema = Arc::new(self.schema.project(projection.unwrap_or(&vec![]))?); + let columnar_projection = projection + .unwrap_or(&vec![]) + .iter() + .map(|j| self.rows.iter().map(|row| row[*j].clone())) + .map(df::ScalarValue::iter_to_array) + .collect::>>()?; + Ok(Arc::new(df::ValuesExec::try_new_from_batches( + projected_schema.clone(), + vec![df::RecordBatch::try_new( + projected_schema, + columnar_projection, + )?], + )?)) + } +} diff --git a/v3/crates/sql/src/catalog/schema.rs b/v3/crates/sql/src/catalog/schema.rs new file mode 100644 index 00000000000..98f08e0f067 --- /dev/null +++ b/v3/crates/sql/src/catalog/schema.rs @@ -0,0 +1,44 @@ +use async_trait::async_trait; +use std::{any::Any, sync::Arc}; + +use indexmap::IndexMap; +use serde::{Deserialize, Serialize}; + +mod df { + pub(super) use datafusion::error::Result; + pub(super) use datafusion::{catalog::schema::SchemaProvider, datasource::TableProvider}; +} + +use crate::catalog; + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub(crate) struct Subgraph { + pub models: IndexMap, +} + +pub struct OpenDDSchemaProvider { + pub(crate) tables: IndexMap>, +} + +#[async_trait] +impl df::SchemaProvider for OpenDDSchemaProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + self.tables.keys().cloned().collect::>() + } + + async fn table(&self, name: &str) -> df::Result>> { + Ok(self + .tables + .get(name) + .cloned() + .map(|table| table as Arc)) + } + + fn table_exist(&self, name: &str) -> bool { + self.tables.contains_key(name) + } +} diff --git a/v3/crates/sql/src/catalog/table.rs b/v3/crates/sql/src/catalog/table.rs new file mode 100644 index 00000000000..b3a2d15aeb4 --- /dev/null +++ b/v3/crates/sql/src/catalog/table.rs @@ -0,0 +1,256 @@ +//! Describe a model for a SQL table and how to translate datafusion operations on the table +//! to ndc-spec queries. + +use std::collections::HashMap; +use std::{any::Any, sync::Arc}; + +use async_trait::async_trait; +use datafusion::common::internal_err; +use hasura_authn_core::Session; +use indexmap::IndexMap; +use metadata_resolve::{self as resolved, Qualified, SelectPermission}; +use open_dds::permissions::Role; +use open_dds::{ + models::ModelName, + types::{CustomTypeName, FieldName}, +}; + +use serde::{Deserialize, Serialize}; +mod df { + pub(super) use datafusion::arrow::datatypes::Field; + pub(super) use datafusion::{ + arrow::datatypes::{DataType, Schema, SchemaBuilder, SchemaRef}, + datasource::{TableProvider, TableType}, + execution::context::SessionState, + logical_expr::Expr, + physical_plan::ExecutionPlan, + }; +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +pub(crate) struct TypePermission { + pub output: open_dds::permissions::TypeOutputPermission, +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub(crate) struct TypePermissionsOfRole { + pub(crate) permissions: HashMap, TypePermission>, +} + +fn get_type_representation<'a>( + model: &'a resolved::Model, + field: &FieldName, +) -> Option<&'a ndc_models::TypeRepresentation> { + model + .source + .as_ref() + .and_then(|source| { + source + .type_mappings + .get(&model.data_type) + .map(|type_mapping| { + let resolved::TypeMapping::Object { field_mappings, .. } = type_mapping; + field_mappings + .get(field) + .map(|mapping| mapping.column_type_representation.as_ref()) + }) + }) + .flatten() + .flatten() +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub(crate) struct Model { + pub name: ModelName, + + pub description: Option, + + // Datafusion table schema + pub schema: df::SchemaRef, + + // For now, descriptions of fields + pub columns: IndexMap>, + + // This is the entry point for the type mappings stored + // in ModelSource + pub data_type: Qualified, + + // The underlying source to execute ndc queries + pub source: Option>, + + // Permisisons for the model. Note that the type permissions will need to be retrieved from the + // global context + pub permissions: HashMap>, +} + +impl Model { + pub fn from_resolved_model(model: &resolved::ModelWithPermissions) -> Self { + let (schema, columns) = { + let mut columns = IndexMap::new(); + let mut builder = df::SchemaBuilder::new(); + for (field_name, field_definition) in &model.model.type_fields { + let ndc_type_representation = get_type_representation(&model.model, field_name); + let field_type = + to_arrow_type(&field_definition.field_type, ndc_type_representation); + if let Some(field_type) = field_type { + builder.push(df::Field::new( + field_name.to_string(), + field_type, + field_definition.field_type.nullable, + )); + let description = if let Some(ndc_models::TypeRepresentation::Enum { one_of }) = + ndc_type_representation + { + // TODO: Instead of stuffing the possible enum values in description, + // surface them in the metadata tables. + Some( + field_definition + .description + .clone() + .unwrap_or_else(String::new) + + &format!(" Possible values: {}", one_of.join(", ")), + ) + } else { + field_definition.description.clone() + }; + columns.insert(field_name.to_string(), description); + } + } + let fields = builder.finish().fields; + (df::SchemaRef::new(df::Schema::new(fields)), columns) + }; + + let permissions = model + .select_permissions + .iter() + .map(|(role, select_permission)| (role.clone(), Arc::new(select_permission.clone()))) + .collect(); + + Model { + name: model.model.name.name.clone(), + description: model.model.raw.description.clone(), + schema, + data_type: model.model.data_type.clone(), + source: model + .model + .source + .as_ref() + .map(|source| Arc::new(source.clone())), + columns, + permissions, + } + } +} + +/// Converts an opendd type to an arrow type. +/// TODO: need to handle complex types +#[allow(clippy::match_same_arms)] +fn to_arrow_type( + ty: &resolved::QualifiedTypeReference, + ndc_type_representation: Option<&ndc_models::TypeRepresentation>, +) -> Option { + match &ty.underlying_type { + resolved::QualifiedBaseType::Named(resolved::QualifiedTypeName::Inbuilt(inbuilt_type)) => { + let data_type = match inbuilt_type { + open_dds::types::InbuiltType::ID => df::DataType::Utf8, + open_dds::types::InbuiltType::Int => df::DataType::Int32, + open_dds::types::InbuiltType::Float => df::DataType::Float32, + open_dds::types::InbuiltType::Boolean => df::DataType::Boolean, + open_dds::types::InbuiltType::String => df::DataType::Utf8, + }; + Some(data_type) + } + resolved::QualifiedBaseType::Named(resolved::QualifiedTypeName::Custom(custom_type)) => { + if let Some(type_representation) = ndc_type_representation { + match type_representation { + ndc_models::TypeRepresentation::Boolean => Some(df::DataType::Boolean), + ndc_models::TypeRepresentation::String => Some(df::DataType::Utf8), + ndc_models::TypeRepresentation::Int8 => Some(df::DataType::Int8), + ndc_models::TypeRepresentation::Int16 => Some(df::DataType::Int16), + ndc_models::TypeRepresentation::Int32 => Some(df::DataType::Int32), + ndc_models::TypeRepresentation::Int64 => Some(df::DataType::Int64), + ndc_models::TypeRepresentation::Float32 => Some(df::DataType::Float32), + ndc_models::TypeRepresentation::Float64 => Some(df::DataType::Float64), + // Can't do anything better for BigInteger, so we just use String. + ndc_models::TypeRepresentation::BigInteger => Some(df::DataType::Utf8), + // BigDecimal128 is not supported by arrow. + ndc_models::TypeRepresentation::BigDecimal => Some(df::DataType::Float64), + ndc_models::TypeRepresentation::UUID => Some(df::DataType::Utf8), + ndc_models::TypeRepresentation::Date => Some(df::DataType::Date32), + ndc_models::TypeRepresentation::Timestamp => Some(df::DataType::Timestamp( + datafusion::arrow::datatypes::TimeUnit::Microsecond, + None, + )), + ndc_models::TypeRepresentation::TimestampTZ => Some(df::DataType::Timestamp( + datafusion::arrow::datatypes::TimeUnit::Microsecond, + None, + )), + ndc_models::TypeRepresentation::Enum { .. } => Some(df::DataType::Utf8), + _ => None, + } + } else { + match custom_type.name.to_string().to_lowercase().as_str() { + "bool" => Some(df::DataType::Boolean), + "int8" => Some(df::DataType::Int8), + "int16" => Some(df::DataType::Int16), + "int32" => Some(df::DataType::Int32), + "int64" => Some(df::DataType::Int64), + "float32" => Some(df::DataType::Float32), + "float64" => Some(df::DataType::Float64), + "varchar" => Some(df::DataType::Utf8), + "text" => Some(df::DataType::Utf8), + "timestamp" => Some(df::DataType::Timestamp( + datafusion::arrow::datatypes::TimeUnit::Microsecond, + None, + )), + "timestamptz" => Some(df::DataType::Timestamp( + datafusion::arrow::datatypes::TimeUnit::Microsecond, + None, + )), + // BigDecimal128 is not supported by arrow. + "bigdecimal" => Some(df::DataType::Float64), + _ => None, + } + } + } + resolved::QualifiedBaseType::List(_) => None, + } +} + +#[derive(Debug, Clone)] +pub(crate) struct OpenDDTableProvider { + pub(crate) session: Arc, + pub(crate) http_context: Arc, + pub(crate) name: ModelName, + pub(crate) data_type: Qualified, + pub(crate) source: Option>, + pub(crate) schema: df::SchemaRef, + pub(crate) select_permission: Option>, + pub(crate) type_permissions: Option>, +} + +#[async_trait] +impl df::TableProvider for OpenDDTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> df::SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> df::TableType { + df::TableType::Base + } + + async fn scan( + &self, + _state: &df::SessionState, + _projection: Option<&Vec>, + // filters and limit can be used here to inject some push-down operations if needed + _filters: &[df::Expr], + _limit: Option, + ) -> datafusion::error::Result> { + internal_err!("scan shouldn't be called") + } +} diff --git a/v3/crates/sql/src/execute.rs b/v3/crates/sql/src/execute.rs new file mode 100644 index 00000000000..dc9a6b8723b --- /dev/null +++ b/v3/crates/sql/src/execute.rs @@ -0,0 +1,158 @@ +use std::sync::Arc; + +use datafusion::{ + arrow::{array::RecordBatch, error::ArrowError, json::writer::JsonArray, json::WriterBuilder}, + dataframe::DataFrame, + error::DataFusionError, +}; +use hasura_authn_core::Session; +use serde::{Deserialize, Serialize}; +use thiserror::Error; + +use tracing_util::{ErrorVisibility, SpanVisibility, Successful, TraceableError}; + +pub use datafusion::execution::context::SessionContext; + +pub(crate) mod analyzer; +pub(crate) mod optimizer; +pub(crate) mod planner; + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct SqlRequest { + sql: String, +} + +#[derive(Error, Debug, Clone)] +pub enum SqlExecutionError { + #[error("error in data fusion: {0}")] + DataFusion(String), + #[error("error in encoding data: {0}")] + Arrow(String), +} + +impl From for SqlExecutionError { + fn from(e: DataFusionError) -> Self { + Self::DataFusion(e.to_string()) + } +} + +impl From for SqlExecutionError { + fn from(e: ArrowError) -> Self { + Self::Arrow(e.to_string()) + } +} + +impl TraceableError for SqlExecutionError { + fn visibility(&self) -> ErrorVisibility { + ErrorVisibility::User + } +} + +/// Executes an SQL Request using the Apache DataFusion query engine. +pub async fn execute_sql( + context: &crate::catalog::Context, + session: Arc, + http_context: Arc, + request: &SqlRequest, +) -> Result, SqlExecutionError> { + let tracer = tracing_util::global_tracer(); + let session_context = tracer + .in_span( + "create_session_context", + "Create a datafusion SessionContext", + SpanVisibility::Internal, + || { + let session = context.create_session_context(&session, &http_context); + Successful::new(session) + }, + ) + .into_inner(); + let data_frame = tracer + .in_span_async( + "create_logical_plan", + "Creates a Logical Plan for the given SQL statement", + SpanVisibility::User, + || { + Box::pin(async { + session_context + .sql(&request.sql) + .await + .map_err(|e| SqlExecutionError::DataFusion(e.to_string())) + }) + }, + ) + .await?; + let batches = tracer + .in_span_async( + "execute_logical_plan", + "Executes the Logical Plan of a query", + SpanVisibility::User, + || Box::pin(async { execute_logical_plan(data_frame).await }), + ) + .await?; + tracer.in_span( + "serialize_record_batch", + "Serializes datafusion's RecordBatch into a JSON array", + SpanVisibility::User, + || record_batches_to_json_array(&batches), + ) +} + +async fn execute_logical_plan(frame: DataFrame) -> Result, SqlExecutionError> { + let tracer = tracing_util::global_tracer(); + let task_ctx = frame.task_ctx(); + let session_config = task_ctx.session_config().clone(); + let plan = tracer + .in_span_async( + "create_physical_plan", + "Creates a physical plan from a logical plan", + SpanVisibility::User, + || { + Box::pin(async { + frame + .create_physical_plan() + .await + .map_err(|e| SqlExecutionError::DataFusion(e.to_string())) + }) + }, + ) + .await?; + let record_batches = tracer + .in_span_async( + "execute_physical_plan", + "Executes a physical plan to collect record batches", + SpanVisibility::User, + || { + let task_ctx = Arc::new(task_ctx.with_session_config( + session_config.with_extension(Arc::new(tracing_util::Context::current())), + )); + Box::pin(async { + datafusion::physical_plan::collect(plan, task_ctx) + .await + .map_err(|e| SqlExecutionError::DataFusion(e.to_string())) + }) + }, + ) + .await?; + Ok(record_batches) +} + +fn record_batches_to_json_array(batches: &[RecordBatch]) -> Result, SqlExecutionError> { + if batches.is_empty() { + return Ok(vec![b'[', b']']); + } + // Write the record batch out as a JSON array + let buf = Vec::new(); + + let builder = WriterBuilder::new().with_explicit_nulls(true); + let mut writer = builder.build::<_, JsonArray>(buf); + + for batch in batches { + writer.write(batch)?; + } + writer.finish()?; + + // Get the underlying buffer back, + Ok(writer.into_inner()) +} diff --git a/v3/crates/sql/src/execute/analyzer.rs b/v3/crates/sql/src/execute/analyzer.rs new file mode 100644 index 00000000000..7ca7b7639d7 --- /dev/null +++ b/v3/crates/sql/src/execute/analyzer.rs @@ -0,0 +1,188 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Analyzed rule to replace TableScan references +//! such as DataFrames and Views and inlines the LogicalPlan. + +use std::collections::BTreeMap; +use std::sync::Arc; + +use datafusion::{ + common::{ + config::ConfigOptions, + tree_node::{Transformed, TransformedResult, TreeNode}, + Result, + }, + error::DataFusionError, + logical_expr::{logical_plan::LogicalPlan, Extension, TableScan}, + optimizer::AnalyzerRule, +}; +use indexmap::IndexMap; +use metadata_resolve::{self as resolved}; +use open_dds::identifier::Identifier; +use open_dds::types::FieldName; + +use crate::plan::NDCQuery; + +/// Analyzed rule that inlines TableScan that provide a [`LogicalPlan`] +/// (DataFrame / ViewTable) +pub struct ReplaceTableScan { + default_schema: Option>, + catalog: Arc, +} + +impl ReplaceTableScan { + pub fn new( + default_schema: Option>, + catalog: Arc, + ) -> Self { + Self { + default_schema, + catalog, + } + } +} + +impl AnalyzerRule for ReplaceTableScan { + fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result { + plan.transform_up(|n| { + analyze_internal( + self.default_schema.as_ref().map(|x| x.as_str()), + &self.catalog, + n, + ) + }) + .data() + } + + fn name(&self) -> &str { + "replace_table_scan_with_ndc_query" + } +} + +fn analyze_internal( + default_schema: Option<&str>, + catalog: &crate::catalog::OpenDDCatalogProvider, + plan: LogicalPlan, +) -> Result> { + // rewrite any subqueries in the plan first + let transformed_plan = plan.map_subqueries(|plan| { + plan.transform_up(|n| analyze_internal(default_schema, catalog, n)) + })?; + + let transformed_plan = transformed_plan.transform_data(|plan| match plan { + LogicalPlan::TableScan(TableScan { + table_name, + source: _, + projection: _, + projected_schema, + filters: _, + fetch: _, + }) if table_name.schema() != Some("hasura") => { + let table = catalog.get(default_schema, &table_name).ok_or_else(|| { + DataFusionError::Internal(format!( + "table provider not found for replace_table_scan: {table_name}" + )) + })?; + let model_source = table.source.as_ref().ok_or_else(|| { + DataFusionError::Plan(format!( + "model source should be configured for {}", + table.name + )) + })?; + let mut ndc_fields = IndexMap::new(); + + let base_type_fields = { + let base_type_mapping = model_source + .type_mappings + .get(&table.data_type) + .ok_or_else(|| { + DataFusionError::Internal(format!( + "couldn't fetch type_mapping of type {} for model {}", + table.data_type, table.name + )) + })?; + match base_type_mapping { + resolved::TypeMapping::Object { + ndc_object_type_name: _, + field_mappings, + } => field_mappings, + } + }; + for field in projected_schema.fields() { + let field_name = { + let field_name = Identifier::new(field.name().clone()).map_err(|e| { + DataFusionError::Internal(format!( + "field name conversion failed {}: {}", + field.name(), + e + )) + })?; + FieldName(field_name) + }; + let ndc_field = { + base_type_fields + .get(&field_name) + .ok_or_else(|| { + DataFusionError::Internal(format!( + "couldn't fetch field mapping of field {} in type {} for model {}", + field_name, table.data_type, table.name + )) + }) + .map(|field_mapping| field_mapping.column.clone()) + }?; + ndc_fields.insert( + field.name().clone(), + ndc_models::Field::Column { + column: ndc_field.to_string(), + fields: None, + arguments: BTreeMap::new(), + }, + ); + } + + let ndc_query = ndc_models::Query { + aggregates: None, + fields: Some(ndc_fields), + limit: None, + offset: None, + order_by: None, + predicate: None, + }; + + let query_request = ndc_models::QueryRequest { + query: ndc_query, + collection: model_source.collection.clone(), + arguments: BTreeMap::new(), + collection_relationships: BTreeMap::new(), + variables: None, + }; + let ndc_query_node = NDCQuery { + table: table_name.clone(), + query: query_request, + data_source_name: Arc::new(model_source.collection.clone()), + schema: projected_schema, + }; + Ok(Transformed::yes(LogicalPlan::Extension(Extension { + node: Arc::new(ndc_query_node), + }))) + } + _ => Ok(Transformed::no(plan)), + })?; + + Ok(transformed_plan) +} diff --git a/v3/crates/sql/src/execute/optimizer.rs b/v3/crates/sql/src/execute/optimizer.rs new file mode 100644 index 00000000000..43eb6d402ca --- /dev/null +++ b/v3/crates/sql/src/execute/optimizer.rs @@ -0,0 +1,3 @@ +mod projection_pushdown; + +pub(crate) use projection_pushdown::NDCPushDownProjection; diff --git a/v3/crates/sql/src/execute/optimizer/projection_pushdown.rs b/v3/crates/sql/src/execute/optimizer/projection_pushdown.rs new file mode 100644 index 00000000000..aaa61988e3f --- /dev/null +++ b/v3/crates/sql/src/execute/optimizer/projection_pushdown.rs @@ -0,0 +1,71 @@ +use std::sync::Arc; + +use datafusion::{ + common::{internal_err, tree_node::Transformed}, + error::Result, + logical_expr::{Expr, Extension, LogicalPlan}, + optimizer::{optimizer::ApplyOrder, OptimizerConfig, OptimizerRule}, +}; + +pub(crate) struct NDCPushDownProjection {} + +impl OptimizerRule for NDCPushDownProjection { + fn try_optimize( + &self, + _plan: &LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result> { + internal_err!("Should have called NDCPushDownProjection::rewrite") + } + + fn name(&self) -> &str { + "ndc_pushdown_projection" + } + fn apply_order(&self) -> Option { + Some(ApplyOrder::BottomUp) + } + + fn supports_rewrite(&self) -> bool { + true + } + + fn rewrite( + &self, + plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result> { + if let Some((projections, projected_schema, ndc_query)) = { + match plan { + LogicalPlan::Projection(ref projection) => match projection.input.as_ref() { + LogicalPlan::Extension(Extension { node }) => node + .as_ref() + .as_any() + .downcast_ref::() + .map(|ndc_query| (&projection.expr, &projection.schema, ndc_query.clone())), + _ => None, + }, + _ => None, + } + } { + let projected_columns = projections_to_columns(projections)?; + let projected_query = + ndc_query.project(projected_schema.clone(), &projected_columns)?; + let plan = LogicalPlan::Extension(Extension { + node: Arc::new(projected_query), + }); + Ok(Transformed::yes(plan)) + } else { + Ok(Transformed::no(plan)) + } + } +} + +fn projections_to_columns(projections: &[Expr]) -> Result> { + projections + .iter() + .map(|expr| match expr { + Expr::Column(column) => Ok(column.name.clone()), + _ => internal_err!("non-column found in projection of ndcscan: {}", expr), + }) + .collect() +} diff --git a/v3/crates/sql/src/execute/planner.rs b/v3/crates/sql/src/execute/planner.rs new file mode 100644 index 00000000000..fce83d8df84 --- /dev/null +++ b/v3/crates/sql/src/execute/planner.rs @@ -0,0 +1,182 @@ +use std::{collections::BTreeMap, sync::Arc}; + +use datafusion::{ + error::{DataFusionError, Result}, + execution::context::{QueryPlanner, SessionState}, + logical_expr::{LogicalPlan, UserDefinedLogicalNode}, + physical_plan::ExecutionPlan, + physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner}, +}; + +use execute::process_model_relationship_definition; +use indexmap::IndexMap; +use metadata_resolve::FilterPermission; +use open_dds::identifier::Identifier; +use open_dds::types::FieldName; + +use crate::plan::NDCPushDown; + +use async_trait::async_trait; + +pub(crate) struct NDCQueryPlanner { + pub(crate) default_schema: Option>, + pub(crate) catalog: Arc, +} + +#[async_trait] +impl QueryPlanner for NDCQueryPlanner { + /// Given a `LogicalPlan` created from above, create an + /// `ExecutionPlan` suitable for execution + async fn create_physical_plan( + &self, + logical_plan: &LogicalPlan, + session_state: &SessionState, + ) -> Result> { + // Teach the default physical planner how to plan TopK nodes. + let physical_planner = + DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(NDCPushDownPlanner { + default_schema: self.default_schema.clone(), + catalog: self.catalog.clone(), + })]); + // Delegate most work of physical planning to the default physical planner + physical_planner + .create_physical_plan(logical_plan, session_state) + .await + } +} + +pub(crate) struct NDCPushDownPlanner { + pub(crate) default_schema: Option>, + pub(crate) catalog: Arc, +} + +#[async_trait] +impl ExtensionPlanner for NDCPushDownPlanner { + /// Create a physical plan for an extension node + async fn plan_extension( + &self, + _planner: &dyn PhysicalPlanner, + node: &dyn UserDefinedLogicalNode, + logical_inputs: &[&LogicalPlan], + physical_inputs: &[Arc], + _session_state: &SessionState, + ) -> Result>> { + if let Some(ndc_node) = node.as_any().downcast_ref::() { + assert_eq!(logical_inputs.len(), 0, "Inconsistent number of inputs"); + assert_eq!(physical_inputs.len(), 0, "Inconsistent number of inputs"); + let table = self + .catalog + .get( + self.default_schema.as_ref().map(|s| s.as_str()), + &ndc_node.table, + ) + .ok_or_else(|| { + DataFusionError::Internal(format!( + "table provider not found for replace_table_scan: {}", + &ndc_node.table + )) + })?; + let model_source = table.source.as_ref().ok_or_else(|| { + DataFusionError::Plan(format!( + "model source should be configured for {}", + table.name + )) + })?; + let select_permission = table.select_permission.as_ref().ok_or_else(|| { + DataFusionError::Plan(format!( + "role {} does not have select permission for model {}", + table.session.role, table.name + )) + })?; + let type_permissions = table.type_permissions.as_ref().ok_or_else(|| { + DataFusionError::Plan(format!( + "role {} does not have permission to select any fields of model {}", + table.session.role, table.name + )) + })?; + let base_type_allowed_fields = &type_permissions + .permissions + .get(&table.data_type) + .ok_or_else(|| { + DataFusionError::Plan(format!( + "role {} has permission to select model {} but does not have permission \ + to select fields of the model's underlying type {}", + table.session.role, table.name, table.data_type + )) + })? + .output + .allowed_fields; + for (field_name, _field) in ndc_node + .query + .query + .fields + .as_ref() + .unwrap_or(&IndexMap::new()) + { + let field_name = { + let field_name = Identifier::new(field_name.clone()).map_err(|e| { + DataFusionError::Internal(format!( + "field name conversion failed {field_name}: {e}" + )) + })?; + FieldName(field_name) + }; + if base_type_allowed_fields.contains(&field_name) { + Ok(()) + } else { + Err(DataFusionError::Plan(format!( + "role {} does not have permission to select the field {} from type {} of model {}", + table.session.role, field_name, table.data_type, table.name + ))) + }?; + } + + let mut usage_counts = execute::model_tracking::UsagesCounts::default(); + let mut relationships = BTreeMap::new(); + + let permission_filter = match &select_permission.filter { + FilterPermission::AllowAll => Ok(ndc_models::Expression::And { + expressions: vec![], + }), + FilterPermission::Filter(filter) => { + execute::ir::permissions::process_model_predicate( + filter, + &table.session.variables, + &mut relationships, + &mut usage_counts, + ) + .map_err(|e| { + DataFusionError::Internal(format!( + "error when processing model predicate: {e}" + )) + }) + } + }?; + + let relationships = relationships + .into_values() + .map(|v| { + process_model_relationship_definition(&v) + .map(|r| (v.relationship_name.to_string(), r)) + .map_err(|e| { + DataFusionError::Internal(format!( + "error constructing ndc relationship definition: {e}" + )) + }) + }) + .collect::, DataFusionError>>()?; + let mut query = ndc_node.query.clone(); + query.query.predicate = Some(permission_filter); + query.collection_relationships = relationships; + let ndc_pushdown = NDCPushDown::new( + table.http_context.clone(), + ndc_node.schema.inner().clone(), + Arc::new(query), + Arc::new(model_source.data_connector.clone()), + ); + Ok(Some(Arc::new(ndc_pushdown))) + } else { + Ok(None) + } + } +} diff --git a/v3/crates/sql/src/lib.rs b/v3/crates/sql/src/lib.rs new file mode 100644 index 00000000000..dc4f4b70c85 --- /dev/null +++ b/v3/crates/sql/src/lib.rs @@ -0,0 +1,3 @@ +pub mod catalog; +pub mod execute; +pub mod plan; diff --git a/v3/crates/sql/src/plan.rs b/v3/crates/sql/src/plan.rs new file mode 100644 index 00000000000..7e70add96b2 --- /dev/null +++ b/v3/crates/sql/src/plan.rs @@ -0,0 +1,257 @@ +use core::fmt; +use std::{any::Any, hash::Hash, sync::Arc}; + +use datafusion::{ + arrow::{ + array::RecordBatch, datatypes::SchemaRef, error::ArrowError, json::reader as arrow_json, + }, + common::DFSchemaRef, + error::DataFusionError, + logical_expr::{LogicalPlan, UserDefinedLogicalNodeCore}, + physical_expr::EquivalenceProperties, + physical_plan::{ + stream::RecordBatchStreamAdapter, DisplayAs, DisplayFormatType, ExecutionMode, + ExecutionPlan, Partitioning, PlanProperties, + }, + sql::TableReference, +}; +use execute::HttpContext; +use futures::TryFutureExt; +use tracing_util::{FutureExt, SpanVisibility, TraceableError}; + +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum ExecutionPlanError { + #[error("{0}")] + NDCExecutionError(#[from] execute::ndc::client::Error), + + #[error("NDC Response not as expected: {0}")] + NDCResponseFormat(String), + + #[error("Arrow error: {0}")] + ArrowError(#[from] ArrowError), + + #[error("Couldn't construct a RecordBatch: {0}")] + RecordBatchConstruction(String), + + #[error("Couldn't fetch otel tracing context")] + TracingContextNotFound, +} + +impl TraceableError for ExecutionPlanError { + fn visibility(&self) -> tracing_util::ErrorVisibility { + tracing_util::ErrorVisibility::Internal + } +} + +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct NDCQuery { + pub(crate) table: TableReference, + pub(crate) query: ndc_models::QueryRequest, + pub(crate) data_source_name: Arc, + pub(crate) schema: DFSchemaRef, +} + +impl Hash for NDCQuery { + fn hash(&self, state: &mut H) { + self.data_source_name.hash(state); + format!("{:#?}", self.query).hash(state); + self.schema.hash(state); + } +} + +impl Eq for NDCQuery {} + +impl NDCQuery { + pub(crate) fn project( + mut self, + schema: DFSchemaRef, + projection: &[String], + ) -> datafusion::error::Result { + let mut current_fields = self.query.query.fields.take().ok_or_else(|| { + DataFusionError::Internal("empty fields found in ndcscan for projection".to_string()) + })?; + let new_fields = projection + .iter() + .map(|projected_field| { + current_fields + .swap_remove(projected_field) + .map(|field| (projected_field.clone(), field)) + .ok_or_else(|| { + DataFusionError::Internal( + "failed to lookup projectd field in ndcscan".to_string(), + ) + }) + }) + .collect::>()?; + let _ = std::mem::replace(&mut self.query.query.fields, Some(new_fields)); + let _ = std::mem::replace(&mut self.schema, schema); + Ok(self) + } +} + +impl UserDefinedLogicalNodeCore for NDCQuery { + fn name(&self) -> &str { + "NDCQuery" + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![] + } + + /// Schema for TopK is the same as the input + fn schema(&self) -> &DFSchemaRef { + &self.schema + } + + fn expressions(&self) -> Vec { + vec![] + } + + /// For example: `TopK: k=10` + fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "NDCQuery: query={:#?}", self.query) + } + + fn with_exprs_and_inputs( + &self, + _exprs: Vec, + _inputs: Vec, + ) -> datafusion::error::Result { + Ok(self.clone()) + } +} + +#[derive(Debug, Clone)] +pub(crate) struct NDCPushDown { + http_context: Arc, + query: Arc, + data_connector: Arc, + projected_schema: SchemaRef, + cache: PlanProperties, +} + +impl NDCPushDown { + pub(crate) fn new( + http_context: Arc, + schema: SchemaRef, + query: Arc, + data_connector: Arc, + ) -> Self { + let cache = Self::compute_properties(schema.clone()); + Self { + http_context, + query, + data_connector, + projected_schema: schema, + cache, + } + } + + /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + fn compute_properties(schema: SchemaRef) -> PlanProperties { + let eq_properties = EquivalenceProperties::new(schema); + PlanProperties::new( + eq_properties, + Partitioning::UnknownPartitioning(1), + ExecutionMode::Bounded, + ) + } +} + +impl DisplayAs for NDCPushDown { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> std::fmt::Result { + write!(f, "NDCPushDown") + } +} + +impl ExecutionPlan for NDCPushDown { + fn name(&self) -> &'static str { + "NDCPushdown" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _: Vec>, + ) -> datafusion::error::Result> { + Ok(self) + } + + fn execute( + &self, + _partition: usize, + context: Arc, + ) -> datafusion::error::Result { + let otel_cx = context + .session_config() + .get_extension::() + .ok_or_else(|| { + DataFusionError::External(Box::new(ExecutionPlanError::TracingContextNotFound)) + })?; + let fut = fetch_from_data_connector( + self.projected_schema.clone(), + self.http_context.clone(), + self.query.clone(), + self.data_connector.clone(), + ) + .with_context((*otel_cx).clone()) + .map_err(|e| DataFusionError::External(Box::new(e))); + let stream = futures::stream::once(fut); + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.projected_schema.clone(), + stream, + ))) + } +} + +pub async fn fetch_from_data_connector( + schema: SchemaRef, + http_context: Arc, + query_request: Arc, + data_connector: Arc, +) -> Result { + let tracer = tracing_util::global_tracer(); + let mut ndc_response = + execute::fetch_from_data_connector(&http_context, &query_request, &data_connector, None) + .await?; + let batch = tracer.in_span( + "ndc_response_to_record_batch", + "Converts NDC Response into datafusion's RecordBatch", + SpanVisibility::Internal, + || { + let rows = ndc_response + .0 + .pop() + .ok_or_else(|| { + ExecutionPlanError::NDCResponseFormat("no row sets found".to_string()) + })? + .rows + .ok_or_else(|| { + ExecutionPlanError::NDCResponseFormat( + "no rows found for the row set".to_string(), + ) + })?; + let mut decoder = arrow_json::ReaderBuilder::new(schema.clone()).build_decoder()?; + decoder.serialize(&rows)?; + decoder.flush()?.ok_or_else(|| { + ExecutionPlanError::RecordBatchConstruction( + "json to arrow decoder did not return any rows".to_string(), + ) + }) + }, + )?; + Ok(batch) +} diff --git a/v3/crates/utils/tracing-util/src/lib.rs b/v3/crates/utils/tracing-util/src/lib.rs index 49b2a959e97..4721a6b6dc9 100644 --- a/v3/crates/utils/tracing-util/src/lib.rs +++ b/v3/crates/utils/tracing-util/src/lib.rs @@ -18,6 +18,7 @@ pub use tracer::{ // risking mismatches and multiple globals pub use opentelemetry::propagation::text_map_propagator::TextMapPropagator; pub use opentelemetry::trace::get_active_span; +pub use opentelemetry::trace::FutureExt; pub use opentelemetry::trace::Status; pub use opentelemetry::Context; pub use opentelemetry_contrib::trace::propagator::trace_context_response::TraceContextResponsePropagator;