diff --git a/Cargo.lock b/Cargo.lock index 209a1d69..65fabce6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,54 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "abi_stable" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69d6512d3eb05ffe5004c59c206de7f99c34951504056ce23fc953842f12c445" +dependencies = [ + "abi_stable_derive", + "abi_stable_shared", + "const_panic", + "core_extensions", + "crossbeam-channel", + "generational-arena", + "libloading 0.7.4", + "lock_api", + "parking_lot", + "paste", + "repr_offset", + "rustc_version", + "serde", + "serde_derive", + "serde_json", +] + +[[package]] +name = "abi_stable_derive" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7178468b407a4ee10e881bc7a328a65e739f0863615cca4429d43916b05e898" +dependencies = [ + "abi_stable_shared", + "as_derive_utils", + "core_extensions", + "proc-macro2", + "quote", + "rustc_version", + "syn 1.0.109", + "typed-arena", +] + +[[package]] +name = "abi_stable_shared" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2b5df7688c123e63f4d4d649cba63f2967ba7f7861b1664fca3f77d3dad2b63" +dependencies = [ + "core_extensions", +] + [[package]] name = "addr2line" version = "0.24.2" @@ -210,6 +258,7 @@ dependencies = [ "arrow-schema", "arrow-select", "arrow-string", + "pyo3", ] [[package]] @@ -444,6 +493,18 @@ dependencies = [ "regex-syntax 0.8.5", ] +[[package]] +name = "as_derive_utils" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff3c96645900a44cf11941c111bd08a6573b0e2f9f69bc9264b179d8fae753c4" +dependencies = [ + "core_extensions", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "async-compression" version = "0.4.19" @@ -461,6 +522,15 @@ dependencies = [ "zstd-safe", ] +[[package]] +name = "async-ffi" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4de21c0feef7e5a556e51af767c953f0501f7f300ba785cc99c47bdc8081a50" +dependencies = [ + "abi_stable", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -1010,6 +1080,12 @@ dependencies = [ "tiny-keccak", ] +[[package]] +name = "const_panic" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2459fc9262a1aa204eb4b5764ad4f189caec88aea9634389c0a25f8be7f6265e" + [[package]] name = "constant_time_eq" version = "0.3.1" @@ -1066,6 +1142,21 @@ dependencies = [ "libc", ] +[[package]] +name = "core_extensions" +version = "1.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92c71dc07c9721607e7a16108336048ee978c3a8b129294534272e8bac96c0ee" +dependencies = [ + "core_extensions_proc_macros", +] + +[[package]] +name = "core_extensions_proc_macros" +version = "1.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f3b219d28b6e3b4ac87bc1fc522e0803ab22e055da177bff0068c4150c61a6" + [[package]] name = "cpufeatures" version = "0.2.17" @@ -1331,6 +1422,7 @@ dependencies = [ "object_store", "parquet", "paste", + "pyo3", "recursive", "sqlparser", "tokio", @@ -1418,6 +1510,27 @@ dependencies = [ "futures", ] +[[package]] +name = "datafusion-ffi" +version = "45.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff47a79d442207c168c6e3e1d970c248589c148e4800e5b285ac1b2cb1a230f8" +dependencies = [ + "abi_stable", + "arrow", + "arrow-array", + "arrow-schema", + "async-ffi", + "async-trait", + "datafusion", + "datafusion-proto", + "futures", + "log", + "prost", + "semver", + "tokio", +] + [[package]] name = "datafusion-functions" version = "45.0.0" @@ -1792,6 +1905,19 @@ dependencies = [ "uuid", ] +[[package]] +name = "datafusion-table-providers-python" +version = "0.3.0" +dependencies = [ + "arrow", + "datafusion", + "datafusion-ffi", + "datafusion-table-providers", + "duckdb", + "pyo3", + "tokio", +] + [[package]] name = "deranged" version = "0.3.11" @@ -1836,7 +1962,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "330c60081dcc4c72131f8eb70510f1ac07223e5d4163db481a04a0befcffa412" dependencies = [ - "libloading", + "libloading 0.8.6", ] [[package]] @@ -2184,6 +2310,15 @@ dependencies = [ "slab", ] +[[package]] +name = "generational-arena" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877e94aff08e743b651baaea359664321055749b398adff8740a7399af7796e7" +dependencies = [ + "cfg-if", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -2748,6 +2883,12 @@ dependencies = [ "serde", ] +[[package]] +name = "indoc" +version = "2.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4c7245a08504955605670dbf141fceab975f15ca21570696aebe9d2e71576bd" + [[package]] name = "inherent" version = "1.0.12" @@ -2969,6 +3110,16 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "libloading" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b67380fd3b2fbe7527a606e18729d21c6f3951633d0500574c4dc22d2d638b9f" +dependencies = [ + "cfg-if", + "winapi", +] + [[package]] name = "libloading" version = "0.8.6" @@ -3133,6 +3284,15 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "memoffset" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a" +dependencies = [ + "autocfg", +] + [[package]] name = "mime" version = "0.3.17" @@ -3920,6 +4080,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "portable-atomic" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "350e9b48cbc6b0e028b0473b114454c6316e57336ee184ceab6e53f72c178b3e" + [[package]] name = "postgres-native-tls" version = "0.5.1" @@ -4082,6 +4248,69 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "pyo3" +version = "0.23.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7778bffd85cf38175ac1f545509665d0b9b92a198ca7941f131f85f7a4f9a872" +dependencies = [ + "cfg-if", + "indoc", + "libc", + "memoffset", + "once_cell", + "portable-atomic", + "pyo3-build-config", + "pyo3-ffi", + "pyo3-macros", + "unindent", +] + +[[package]] +name = "pyo3-build-config" +version = "0.23.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94f6cbe86ef3bf18998d9df6e0f3fc1050a8c5efa409bf712e661a4366e010fb" +dependencies = [ + "once_cell", + "target-lexicon", +] + +[[package]] +name = "pyo3-ffi" +version = "0.23.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9f1b4c431c0bb1c8fb0a338709859eed0d030ff6daa34368d3b152a63dfdd8d" +dependencies = [ + "libc", + "pyo3-build-config", +] + +[[package]] +name = "pyo3-macros" +version = "0.23.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbc2201328f63c4710f68abdf653c89d8dbc2858b88c5d88b0ff38a75288a9da" +dependencies = [ + "proc-macro2", + "pyo3-macros-backend", + "quote", + "syn 2.0.100", +] + +[[package]] +name = "pyo3-macros-backend" +version = "0.23.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fca6726ad0f3da9c9de093d6f116a93c1a38e417ed73bf138472cf4064f72028" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "pyo3-build-config", + "quote", + "syn 2.0.100", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -4277,6 +4506,15 @@ dependencies = [ "bytecheck", ] +[[package]] +name = "repr_offset" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb1070755bd29dffc19d0971cab794e607839ba2ef4b69a9e6fbc8733c1b72ea" +dependencies = [ + "tstr", +] + [[package]] name = "reqwest" version = "0.12.12" @@ -5045,6 +5283,12 @@ dependencies = [ "xattr", ] +[[package]] +name = "target-lexicon" +version = "0.12.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1" + [[package]] name = "tempfile" version = "3.18.0" @@ -5536,6 +5780,21 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tstr" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f8e0294f14baae476d0dd0a2d780b2e24d66e349a9de876f5126777a37bdba7" +dependencies = [ + "tstr_proc_macros", +] + +[[package]] +name = "tstr_proc_macros" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e78122066b0cb818b8afd08f7ed22f7fdbc3e90815035726f0840d0d26c0747a" + [[package]] name = "twox-hash" version = "1.6.3" @@ -5552,6 +5811,12 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e7b17f197b3050ba473acf9181f7b1d3b66d1cf7356c6cc57886662276e65908" +[[package]] +name = "typed-arena" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6af6ae20167a9ece4bcb41af5b80f8a1f1df981f6391189ce00fd257af04126a" + [[package]] name = "typenum" version = "1.18.0" @@ -5597,6 +5862,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd" +[[package]] +name = "unindent" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7264e107f553ccae879d21fbea1d6724ac785e8c3bfc762137959b5802826ef3" + [[package]] name = "untrusted" version = "0.9.0" diff --git a/Cargo.toml b/Cargo.toml index f4f94969..0771ca06 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,28 @@ -[package] -name = "datafusion-table-providers" +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[workspace] +members = [ + "core", + "python", +] +resolver = "2" + +[workspace.package] version = "0.3.0" readme = "README.md" edition = "2021" @@ -7,159 +30,22 @@ repository = "https://github.com/datafusion-contrib/datafusion-table-providers" license = "Apache-2.0" description = "Extend the capabilities of DataFusion to support additional data sources via implementations of the `TableProvider` trait." -[dependencies] +[workspace.dependencies] arrow = "54.2.1" -arrow-array = { version = "54.2.1", optional = true } -arrow-flight = { version = "54.2.1", optional = true, features = [ +arrow-array = { version = "54.2.1" } +arrow-flight = { version = "54.2.1", features = [ "flight-sql-experimental", "tls", ] } -arrow-schema = { version = "54.2.1", optional = true, features = ["serde"] } +arrow-schema = { version = "54.2.1", features = ["serde"] } arrow-json = "54.2.1" -arrow-odbc = { version = "=15.1.1", optional = true } -async-stream = { version = "0.3", optional = true } -async-trait = "0.1" -base64 = { version = "0.22.1", optional = true } -bb8 = { version = "0.9", optional = true } -bb8-postgres = { version = "0.9", optional = true } -bigdecimal = "0.4" -byteorder = "1.5.0" -bytes = { version = "1.7.1", optional = true } -byte-unit = { version = "5.1.4", optional = true } -chrono = "0.4" -dashmap = "6.1.0" +arrow-odbc = { version = "=15.1.1" } datafusion = { version = "45", default-features = false } -datafusion-expr = { version = "45", optional = true } -datafusion-federation = { version = "=0.3.6", features = [ - "sql", -], optional = true } -datafusion-physical-expr = { version = "45", optional = true } -datafusion-physical-plan = { version = "45", optional = true } -datafusion-proto = { version = "45", optional = true } -duckdb = { version = "=1.2.1", features = [ - "bundled", - "r2d2", - "vtab", - "vtab-arrow", - "appender-arrow", -], optional = true } -libduckdb-sys = { version = "=1.2.1", optional = true } -dyn-clone = { version = "1.0", optional = true } -fallible-iterator = "0.3.0" -fundu = "2.0.1" -futures = "0.3" -geo-types = "0.7" -itertools = "0.14.0" -mysql_async = { version = "0.35", features = [ - "native-tls-tls", - "chrono", - "time", - "bigdecimal", -], optional = true } -native-tls = { version = "0.2", optional = true } -num-bigint = "0.4" -odbc-api = { version = "11.1", optional = true } -pem = { version = "3.0.4", optional = true } -postgres-native-tls = { version = "0.5.0", optional = true } -prost = { version = "0.13", optional = true } -rand = { version = "0.9" } -r2d2 = { version = "0.8", optional = true } -rusqlite = { version = "0.32", optional = true } -sea-query = { version = "0.32", features = [ - "backend-sqlite", - "backend-postgres", - "postgres-array", - "with-rust_decimal", - "with-bigdecimal", - "with-time", - "with-chrono", -] } -secrecy = "0.10.3" -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" -sha2 = "0.10" -snafu = "0.8" -time = "0.3" -tokio = { version = "1.44", features = ["macros", "fs"] } -tokio-postgres = { version = "0.7", features = [ - "with-chrono-0_4", - "with-uuid-1", - "with-serde_json-1", - "with-geo-types-0_7", -], optional = true } -tokio-rusqlite = { version = "0.6.0", optional = true } -tonic = { version = "0.12", optional = true, features = [ - "tls-native-roots", - "tls-webpki-roots", -] } -tracing = "0.1" -trust-dns-resolver = "0.23.2" -url = "2.5.4" -uuid = { version = "1.11", optional = true } - -[dev-dependencies] -anyhow = "1.0" -bollard = "0.18.1" -geozero = { version = "0.14.0", features = ["with-wkb"] } -insta = { version = "1.42.0", features = ["filters"] } -prost = { version = "0.13" } -rand = "0.9" -reqwest = "0.12" -rstest = "0.24.0" -test-log = { version = "0.2", features = ["trace"] } -tokio-stream = { version = "0.1", features = ["net"] } -tracing-subscriber = { version = "0.3", features = ["env-filter"] } - -[features] -duckdb = [ - "dep:duckdb", - "dep:r2d2", - "dep:uuid", - "dep:dyn-clone", - "dep:async-stream", - "dep:arrow-schema", - "dep:byte-unit", -] -duckdb-federation = ["duckdb", "federation"] -federation = ["dep:datafusion-federation"] -flight = [ - "dep:arrow-flight", - "datafusion/serde", - "dep:datafusion-proto", - "dep:prost", - "dep:tonic", -] -mysql = ["dep:mysql_async", "dep:async-stream"] -mysql-federation = ["mysql", "federation"] -odbc = ["dep:odbc-api", "dep:arrow-odbc", "dep:async-stream", "dep:dyn-clone"] -odbc-federation = ["odbc", "federation"] -postgres = [ - "dep:tokio-postgres", - "dep:uuid", - "dep:postgres-native-tls", - "dep:bb8", - "dep:bb8-postgres", - "dep:native-tls", - "dep:pem", - "dep:async-stream", - "dep:arrow-schema", -] -postgres-federation = ["postgres", "federation"] -sqlite = ["dep:rusqlite", "dep:tokio-rusqlite", "dep:arrow-schema"] -sqlite-federation = ["sqlite", "federation"] -sqlite-bundled = ["sqlite", "rusqlite/bundled"] - -[[example]] -name = "odbc_sqlite" -path = "examples/odbc_sqlite.rs" -required-features = ["sqlite", "odbc"] - -[[example]] -name = "flight-sql" -path = "examples/flight-sql.rs" -required-features = ["flight"] - -[[example]] -name = "sqlite" -path = "examples/sqlite.rs" -required-features = ["sqlite"] +datafusion-expr = { version = "45" } +datafusion-federation = { version = "=0.3.6" } +datafusion-ffi = { version = "45" } +datafusion-proto = { version = "45" } +datafusion-physical-expr = { version = "45" } +datafusion-physical-plan = { version = "45" } +datafusion-table-providers = { path = "core" } +duckdb = { version = "=1.2.1" } diff --git a/Makefile b/Makefile index bf3d0733..f352c9f1 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ all: .PHONY: test test: - cargo test --features duckdb-federation,flight,mysql-federation,postgres-federation,sqlite-federation + cargo test --features duckdb-federation,flight,mysql-federation,postgres-federation,sqlite-federation -p datafusion-table-providers --lib .PHONY: lint lint: diff --git a/core/Cargo.toml b/core/Cargo.toml new file mode 100644 index 00000000..931b393f --- /dev/null +++ b/core/Cargo.toml @@ -0,0 +1,165 @@ +[package] +name = "datafusion-table-providers" +version = { workspace = true} +readme = { workspace = true } +edition = { workspace = true } +repository = { workspace = true } +license = { workspace = true } +description = { workspace = true } + +[dependencies] +arrow = { workspace = true } +arrow-array = { workspace = true, optional = true } +arrow-flight = { workspace = true, optional = true, features = [ + "flight-sql-experimental", + "tls", +] } +arrow-schema = { workspace = true, optional = true, features = ["serde"] } +arrow-json = { workspace = true } +arrow-odbc = { workspace = true, optional = true } +async-stream = { version = "0.3", optional = true } +async-trait = "0.1" +base64 = { version = "0.22.1", optional = true } +bb8 = { version = "0.9", optional = true } +bb8-postgres = { version = "0.9", optional = true } +bigdecimal = "0.4" +byteorder = "1.5.0" +bytes = { version = "1.7.1", optional = true } +byte-unit = { version = "5.1.4", optional = true } +chrono = "0.4" +dashmap = "6.1.0" +datafusion = { workspace = true, default-features = false } +datafusion-expr = { workspace = true, optional = true } +datafusion-federation = { workspace = true, features = [ + "sql", +], optional = true } +datafusion-physical-expr = { workspace = true, optional = true } +datafusion-physical-plan = { workspace = true, optional = true } +datafusion-proto = { workspace = true, optional = true } +duckdb = { workspace = true, features = [ + "bundled", + "r2d2", + "vtab", + "vtab-arrow", + "appender-arrow", +], optional = true } +libduckdb-sys = { version = "=1.2.1", optional = true } +dyn-clone = { version = "1.0", optional = true } +fallible-iterator = "0.3.0" +fundu = "2.0.1" +futures = "0.3" +geo-types = "0.7" +itertools = "0.14.0" +mysql_async = { version = "0.35", features = [ + "native-tls-tls", + "chrono", + "time", + "bigdecimal", +], optional = true } +native-tls = { version = "0.2", optional = true } +num-bigint = "0.4" +odbc-api = { version = "11.1", optional = true } +pem = { version = "3.0.4", optional = true } +postgres-native-tls = { version = "0.5.0", optional = true } +prost = { version = "0.13", optional = true } +rand = { version = "0.9" } +r2d2 = { version = "0.8", optional = true } +rusqlite = { version = "0.32", optional = true } +sea-query = { version = "0.32", features = [ + "backend-sqlite", + "backend-postgres", + "postgres-array", + "with-rust_decimal", + "with-bigdecimal", + "with-time", + "with-chrono", +] } +secrecy = "0.10.3" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +sha2 = "0.10" +snafu = "0.8" +time = "0.3" +tokio = { version = "1.43", features = ["macros", "fs"] } +tokio-postgres = { version = "0.7", features = [ + "with-chrono-0_4", + "with-uuid-1", + "with-serde_json-1", + "with-geo-types-0_7", +], optional = true } +tokio-rusqlite = { version = "0.6.0", optional = true } +tonic = { version = "0.12", optional = true, features = [ + "tls-native-roots", + "tls-webpki-roots", +] } +tracing = "0.1" +trust-dns-resolver = "0.23.2" +url = "2.5.4" +uuid = { version = "1.11", optional = true } + +[dev-dependencies] +anyhow = "1.0" +bollard = "0.18.1" +geozero = { version = "0.14.0", features = ["with-wkb"] } +insta = { version = "1.42.0", features = ["filters"] } +prost = { version = "0.13" } +rand = "0.9" +reqwest = "0.12" +rstest = "0.24.0" +test-log = { version = "0.2", features = ["trace"] } +tokio-stream = { version = "0.1", features = ["net"] } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } + +[features] +duckdb = [ + "dep:duckdb", + "dep:r2d2", + "dep:uuid", + "dep:dyn-clone", + "dep:async-stream", + "dep:arrow-schema", + "dep:byte-unit", +] +duckdb-federation = ["duckdb", "federation"] +federation = ["dep:datafusion-federation"] +flight = [ + "dep:arrow-flight", + "datafusion/serde", + "dep:datafusion-proto", + "dep:prost", + "dep:tonic", +] +mysql = ["dep:mysql_async", "dep:async-stream"] +mysql-federation = ["mysql", "federation"] +odbc = ["dep:odbc-api", "dep:arrow-odbc", "dep:async-stream", "dep:dyn-clone"] +odbc-federation = ["odbc", "federation"] +postgres = [ + "dep:tokio-postgres", + "dep:uuid", + "dep:postgres-native-tls", + "dep:bb8", + "dep:bb8-postgres", + "dep:native-tls", + "dep:pem", + "dep:async-stream", + "dep:arrow-schema", +] +postgres-federation = ["postgres", "federation"] +sqlite = ["dep:rusqlite", "dep:tokio-rusqlite", "dep:arrow-schema"] +sqlite-federation = ["sqlite", "federation"] +sqlite-bundled = ["sqlite", "rusqlite/bundled"] + +[[example]] +name = "odbc_sqlite" +path = "examples/odbc_sqlite.rs" +required-features = ["sqlite", "odbc"] + +[[example]] +name = "flight-sql" +path = "examples/flight-sql.rs" +required-features = ["flight"] + +[[example]] +name = "sqlite" +path = "examples/sqlite.rs" +required-features = ["sqlite"] \ No newline at end of file diff --git a/examples/duckdb.rs b/core/examples/duckdb.rs similarity index 100% rename from examples/duckdb.rs rename to core/examples/duckdb.rs diff --git a/examples/duckdb_example.db b/core/examples/duckdb_example.db similarity index 100% rename from examples/duckdb_example.db rename to core/examples/duckdb_example.db diff --git a/examples/duckdb_external_table.rs b/core/examples/duckdb_external_table.rs similarity index 100% rename from examples/duckdb_external_table.rs rename to core/examples/duckdb_external_table.rs diff --git a/examples/duckdb_function.rs b/core/examples/duckdb_function.rs similarity index 100% rename from examples/duckdb_function.rs rename to core/examples/duckdb_function.rs diff --git a/examples/flight-sql.rs b/core/examples/flight-sql.rs similarity index 100% rename from examples/flight-sql.rs rename to core/examples/flight-sql.rs diff --git a/examples/mysql.rs b/core/examples/mysql.rs similarity index 100% rename from examples/mysql.rs rename to core/examples/mysql.rs diff --git a/examples/odbc_sqlite.rs b/core/examples/odbc_sqlite.rs similarity index 100% rename from examples/odbc_sqlite.rs rename to core/examples/odbc_sqlite.rs diff --git a/examples/postgres.rs b/core/examples/postgres.rs similarity index 100% rename from examples/postgres.rs rename to core/examples/postgres.rs diff --git a/examples/sqlite.rs b/core/examples/sqlite.rs similarity index 100% rename from examples/sqlite.rs rename to core/examples/sqlite.rs diff --git a/examples/sqlite_example.db b/core/examples/sqlite_example.db similarity index 100% rename from examples/sqlite_example.db rename to core/examples/sqlite_example.db diff --git a/src/common.rs b/core/src/common.rs similarity index 100% rename from src/common.rs rename to core/src/common.rs diff --git a/src/duckdb.rs b/core/src/duckdb.rs similarity index 100% rename from src/duckdb.rs rename to core/src/duckdb.rs diff --git a/src/duckdb/creator.rs b/core/src/duckdb/creator.rs similarity index 100% rename from src/duckdb/creator.rs rename to core/src/duckdb/creator.rs diff --git a/src/duckdb/federation.rs b/core/src/duckdb/federation.rs similarity index 100% rename from src/duckdb/federation.rs rename to core/src/duckdb/federation.rs diff --git a/src/duckdb/sql_table.rs b/core/src/duckdb/sql_table.rs similarity index 100% rename from src/duckdb/sql_table.rs rename to core/src/duckdb/sql_table.rs diff --git a/src/duckdb/write.rs b/core/src/duckdb/write.rs similarity index 100% rename from src/duckdb/write.rs rename to core/src/duckdb/write.rs diff --git a/src/flight.rs b/core/src/flight.rs similarity index 100% rename from src/flight.rs rename to core/src/flight.rs diff --git a/src/flight/codec.rs b/core/src/flight/codec.rs similarity index 100% rename from src/flight/codec.rs rename to core/src/flight/codec.rs diff --git a/src/flight/exec.rs b/core/src/flight/exec.rs similarity index 100% rename from src/flight/exec.rs rename to core/src/flight/exec.rs diff --git a/src/flight/sql.rs b/core/src/flight/sql.rs similarity index 100% rename from src/flight/sql.rs rename to core/src/flight/sql.rs diff --git a/src/lib.rs b/core/src/lib.rs similarity index 100% rename from src/lib.rs rename to core/src/lib.rs diff --git a/src/mysql.rs b/core/src/mysql.rs similarity index 100% rename from src/mysql.rs rename to core/src/mysql.rs diff --git a/src/mysql/federation.rs b/core/src/mysql/federation.rs similarity index 100% rename from src/mysql/federation.rs rename to core/src/mysql/federation.rs diff --git a/src/mysql/mysql_window.rs b/core/src/mysql/mysql_window.rs similarity index 100% rename from src/mysql/mysql_window.rs rename to core/src/mysql/mysql_window.rs diff --git a/src/mysql/sql_table.rs b/core/src/mysql/sql_table.rs similarity index 100% rename from src/mysql/sql_table.rs rename to core/src/mysql/sql_table.rs diff --git a/src/mysql/write.rs b/core/src/mysql/write.rs similarity index 100% rename from src/mysql/write.rs rename to core/src/mysql/write.rs diff --git a/src/odbc.rs b/core/src/odbc.rs similarity index 100% rename from src/odbc.rs rename to core/src/odbc.rs diff --git a/src/postgres.rs b/core/src/postgres.rs similarity index 100% rename from src/postgres.rs rename to core/src/postgres.rs diff --git a/src/postgres/write.rs b/core/src/postgres/write.rs similarity index 100% rename from src/postgres/write.rs rename to core/src/postgres/write.rs diff --git a/src/sql/arrow_sql_gen/arrow.rs b/core/src/sql/arrow_sql_gen/arrow.rs similarity index 100% rename from src/sql/arrow_sql_gen/arrow.rs rename to core/src/sql/arrow_sql_gen/arrow.rs diff --git a/src/sql/arrow_sql_gen/mod.rs b/core/src/sql/arrow_sql_gen/mod.rs similarity index 100% rename from src/sql/arrow_sql_gen/mod.rs rename to core/src/sql/arrow_sql_gen/mod.rs diff --git a/src/sql/arrow_sql_gen/mysql.rs b/core/src/sql/arrow_sql_gen/mysql.rs similarity index 100% rename from src/sql/arrow_sql_gen/mysql.rs rename to core/src/sql/arrow_sql_gen/mysql.rs diff --git a/src/sql/arrow_sql_gen/postgres.rs b/core/src/sql/arrow_sql_gen/postgres.rs similarity index 100% rename from src/sql/arrow_sql_gen/postgres.rs rename to core/src/sql/arrow_sql_gen/postgres.rs diff --git a/src/sql/arrow_sql_gen/postgres/builder.rs b/core/src/sql/arrow_sql_gen/postgres/builder.rs similarity index 100% rename from src/sql/arrow_sql_gen/postgres/builder.rs rename to core/src/sql/arrow_sql_gen/postgres/builder.rs diff --git a/src/sql/arrow_sql_gen/postgres/composite.rs b/core/src/sql/arrow_sql_gen/postgres/composite.rs similarity index 100% rename from src/sql/arrow_sql_gen/postgres/composite.rs rename to core/src/sql/arrow_sql_gen/postgres/composite.rs diff --git a/src/sql/arrow_sql_gen/postgres/schema.rs b/core/src/sql/arrow_sql_gen/postgres/schema.rs similarity index 100% rename from src/sql/arrow_sql_gen/postgres/schema.rs rename to core/src/sql/arrow_sql_gen/postgres/schema.rs diff --git a/src/sql/arrow_sql_gen/sqlite.rs b/core/src/sql/arrow_sql_gen/sqlite.rs similarity index 100% rename from src/sql/arrow_sql_gen/sqlite.rs rename to core/src/sql/arrow_sql_gen/sqlite.rs diff --git a/src/sql/arrow_sql_gen/statement.rs b/core/src/sql/arrow_sql_gen/statement.rs similarity index 100% rename from src/sql/arrow_sql_gen/statement.rs rename to core/src/sql/arrow_sql_gen/statement.rs diff --git a/src/sql/db_connection_pool/dbconnection.rs b/core/src/sql/db_connection_pool/dbconnection.rs similarity index 100% rename from src/sql/db_connection_pool/dbconnection.rs rename to core/src/sql/db_connection_pool/dbconnection.rs diff --git a/src/sql/db_connection_pool/dbconnection/duckdbconn.rs b/core/src/sql/db_connection_pool/dbconnection/duckdbconn.rs similarity index 88% rename from src/sql/db_connection_pool/dbconnection/duckdbconn.rs rename to core/src/sql/db_connection_pool/dbconnection/duckdbconn.rs index 9ed35451..c3c4d713 100644 --- a/src/sql/db_connection_pool/dbconnection/duckdbconn.rs +++ b/core/src/sql/db_connection_pool/dbconnection/duckdbconn.rs @@ -1,5 +1,5 @@ use std::any::Any; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use arrow::array::RecordBatch; use arrow_schema::{DataType, Field}; @@ -18,6 +18,7 @@ use duckdb::{Connection, DuckdbConnectionManager}; use dyn_clone::DynClone; use rand::distr::{Alphanumeric, SampleString}; use snafu::{prelude::*, ResultExt}; +use tokio::runtime::{Handle, Runtime}; use tokio::sync::mpsc::Sender; use crate::util::schema::SchemaValidator; @@ -281,6 +282,13 @@ impl DbConnection, DuckDBParamet } } +fn get_tokio_runtime() -> &'static Runtime { + // TODO: this function is a repetition of python/src/utils.rs::get_tokio_runtime. + // Think about how to refactor it + static RUNTIME: OnceLock = OnceLock::new(); + RUNTIME.get_or_init(|| Runtime::new().expect("Failed to create Tokio runtime")) +} + impl SyncDbConnection, DuckDBParameter> for DuckDbConnection { @@ -395,48 +403,58 @@ impl SyncDbConnection, DuckDBPar let cloned_schema = schema.clone(); let attachments = self.attachments.clone(); - let join_handle = tokio::task::spawn_blocking(move || { - Self::attach(&conn, &attachments)?; // this attach could happen when we clone the connection, but we can't detach after the thread closes because the connection isn't thread safe - let mut stmt = conn.prepare(&sql).context(DuckDBQuerySnafu)?; - let params: &[&dyn ToSql] = ¶ms - .iter() - .map(|f| f.as_input_parameter()) - .collect::>(); - let result: duckdb::ArrowStream<'_> = stmt - .stream_arrow(params, cloned_schema) - .context(DuckDBQuerySnafu)?; - for i in result { - blocking_channel_send(&batch_tx, i)?; - } + let create_stream = || -> Result { + let join_handle = tokio::task::spawn_blocking(move || { + Self::attach(&conn, &attachments)?; // this attach could happen when we clone the connection, but we can't detach after the thread closes because the connection isn't thread safe + let mut stmt = conn.prepare(&sql).context(DuckDBQuerySnafu)?; + let params: &[&dyn ToSql] = ¶ms + .iter() + .map(|f| f.as_input_parameter()) + .collect::>(); + let result: duckdb::ArrowStream<'_> = stmt + .stream_arrow(params, cloned_schema) + .context(DuckDBQuerySnafu)?; + for i in result { + blocking_channel_send(&batch_tx, i)?; + } - Self::detach(&conn, &attachments)?; - Ok::<_, Box>(()) - }); + Self::detach(&conn, &attachments)?; + Ok::<_, Box>(()) + }); - let output_stream = stream! { - while let Some(batch) = batch_rx.recv().await { - yield Ok(batch); - } + let output_stream = stream! { + while let Some(batch) = batch_rx.recv().await { + yield Ok(batch); + } - match join_handle.await { - Ok(Err(task_error)) => { - yield Err(DataFusionError::Execution(format!( - "Failed to execute DuckDB query: {task_error}" - ))) - }, - Err(join_error) => { - yield Err(DataFusionError::Execution(format!( - "Failed to execute DuckDB query: {join_error}" - ))) - }, - _ => {} - } + match join_handle.await { + Ok(Err(task_error)) => { + yield Err(DataFusionError::Execution(format!( + "Failed to execute DuckDB query: {task_error}" + ))) + }, + Err(join_error) => { + yield Err(DataFusionError::Execution(format!( + "Failed to execute DuckDB query: {join_error}" + ))) + }, + _ => {} + } + }; + + Ok(Box::pin(RecordBatchStreamAdapter::new( + schema, + output_stream, + ))) }; - Ok(Box::pin(RecordBatchStreamAdapter::new( - schema, - output_stream, - ))) + // If calling directly from Rust, there is already tokio runtime so no + // additional work is needed. If calling from Python FFI, there's no existing + // tokio runtime, so we need to start a new one. + match Handle::try_current() { + Ok(_) => create_stream(), + Err(_) => get_tokio_runtime().block_on(async { create_stream() }), + } } fn execute(&self, sql: &str, params: &[DuckDBParameter]) -> Result { diff --git a/src/sql/db_connection_pool/dbconnection/mysqlconn.rs b/core/src/sql/db_connection_pool/dbconnection/mysqlconn.rs similarity index 100% rename from src/sql/db_connection_pool/dbconnection/mysqlconn.rs rename to core/src/sql/db_connection_pool/dbconnection/mysqlconn.rs diff --git a/src/sql/db_connection_pool/dbconnection/odbcconn.rs b/core/src/sql/db_connection_pool/dbconnection/odbcconn.rs similarity index 100% rename from src/sql/db_connection_pool/dbconnection/odbcconn.rs rename to core/src/sql/db_connection_pool/dbconnection/odbcconn.rs diff --git a/src/sql/db_connection_pool/dbconnection/postgresconn.rs b/core/src/sql/db_connection_pool/dbconnection/postgresconn.rs similarity index 100% rename from src/sql/db_connection_pool/dbconnection/postgresconn.rs rename to core/src/sql/db_connection_pool/dbconnection/postgresconn.rs diff --git a/src/sql/db_connection_pool/dbconnection/sqliteconn.rs b/core/src/sql/db_connection_pool/dbconnection/sqliteconn.rs similarity index 100% rename from src/sql/db_connection_pool/dbconnection/sqliteconn.rs rename to core/src/sql/db_connection_pool/dbconnection/sqliteconn.rs diff --git a/src/sql/db_connection_pool/duckdbpool.rs b/core/src/sql/db_connection_pool/duckdbpool.rs similarity index 100% rename from src/sql/db_connection_pool/duckdbpool.rs rename to core/src/sql/db_connection_pool/duckdbpool.rs diff --git a/src/sql/db_connection_pool/mod.rs b/core/src/sql/db_connection_pool/mod.rs similarity index 100% rename from src/sql/db_connection_pool/mod.rs rename to core/src/sql/db_connection_pool/mod.rs diff --git a/src/sql/db_connection_pool/mysqlpool.rs b/core/src/sql/db_connection_pool/mysqlpool.rs similarity index 100% rename from src/sql/db_connection_pool/mysqlpool.rs rename to core/src/sql/db_connection_pool/mysqlpool.rs diff --git a/src/sql/db_connection_pool/odbcpool.rs b/core/src/sql/db_connection_pool/odbcpool.rs similarity index 100% rename from src/sql/db_connection_pool/odbcpool.rs rename to core/src/sql/db_connection_pool/odbcpool.rs diff --git a/src/sql/db_connection_pool/postgrespool.rs b/core/src/sql/db_connection_pool/postgrespool.rs similarity index 100% rename from src/sql/db_connection_pool/postgrespool.rs rename to core/src/sql/db_connection_pool/postgrespool.rs diff --git a/src/sql/db_connection_pool/sqlitepool.rs b/core/src/sql/db_connection_pool/sqlitepool.rs similarity index 100% rename from src/sql/db_connection_pool/sqlitepool.rs rename to core/src/sql/db_connection_pool/sqlitepool.rs diff --git a/src/sql/mod.rs b/core/src/sql/mod.rs similarity index 100% rename from src/sql/mod.rs rename to core/src/sql/mod.rs diff --git a/src/sql/sql_provider_datafusion/federation.rs b/core/src/sql/sql_provider_datafusion/federation.rs similarity index 100% rename from src/sql/sql_provider_datafusion/federation.rs rename to core/src/sql/sql_provider_datafusion/federation.rs diff --git a/src/sql/sql_provider_datafusion/mod.rs b/core/src/sql/sql_provider_datafusion/mod.rs similarity index 100% rename from src/sql/sql_provider_datafusion/mod.rs rename to core/src/sql/sql_provider_datafusion/mod.rs diff --git a/src/sqlite.rs b/core/src/sqlite.rs similarity index 100% rename from src/sqlite.rs rename to core/src/sqlite.rs diff --git a/src/sqlite/federation.rs b/core/src/sqlite/federation.rs similarity index 100% rename from src/sqlite/federation.rs rename to core/src/sqlite/federation.rs diff --git a/src/sqlite/sql_table.rs b/core/src/sqlite/sql_table.rs similarity index 100% rename from src/sqlite/sql_table.rs rename to core/src/sqlite/sql_table.rs diff --git a/src/sqlite/sqlite_interval.rs b/core/src/sqlite/sqlite_interval.rs similarity index 100% rename from src/sqlite/sqlite_interval.rs rename to core/src/sqlite/sqlite_interval.rs diff --git a/src/sqlite/write.rs b/core/src/sqlite/write.rs similarity index 100% rename from src/sqlite/write.rs rename to core/src/sqlite/write.rs diff --git a/src/util/column_reference.rs b/core/src/util/column_reference.rs similarity index 100% rename from src/util/column_reference.rs rename to core/src/util/column_reference.rs diff --git a/src/util/constraints.rs b/core/src/util/constraints.rs similarity index 100% rename from src/util/constraints.rs rename to core/src/util/constraints.rs diff --git a/src/util/indexes.rs b/core/src/util/indexes.rs similarity index 100% rename from src/util/indexes.rs rename to core/src/util/indexes.rs diff --git a/src/util/mod.rs b/core/src/util/mod.rs similarity index 100% rename from src/util/mod.rs rename to core/src/util/mod.rs diff --git a/src/util/ns_lookup.rs b/core/src/util/ns_lookup.rs similarity index 100% rename from src/util/ns_lookup.rs rename to core/src/util/ns_lookup.rs diff --git a/src/util/on_conflict.rs b/core/src/util/on_conflict.rs similarity index 100% rename from src/util/on_conflict.rs rename to core/src/util/on_conflict.rs diff --git a/src/util/retriable_error.rs b/core/src/util/retriable_error.rs similarity index 100% rename from src/util/retriable_error.rs rename to core/src/util/retriable_error.rs diff --git a/src/util/schema.rs b/core/src/util/schema.rs similarity index 100% rename from src/util/schema.rs rename to core/src/util/schema.rs diff --git a/src/util/secrets.rs b/core/src/util/secrets.rs similarity index 100% rename from src/util/secrets.rs rename to core/src/util/secrets.rs diff --git a/src/util/test.rs b/core/src/util/test.rs similarity index 100% rename from src/util/test.rs rename to core/src/util/test.rs diff --git a/tests/arrow_record_batch_gen/mod.rs b/core/tests/arrow_record_batch_gen/mod.rs similarity index 100% rename from tests/arrow_record_batch_gen/mod.rs rename to core/tests/arrow_record_batch_gen/mod.rs diff --git a/tests/docker/mod.rs b/core/tests/docker/mod.rs similarity index 100% rename from tests/docker/mod.rs rename to core/tests/docker/mod.rs diff --git a/tests/duckdb/mod.rs b/core/tests/duckdb/mod.rs similarity index 100% rename from tests/duckdb/mod.rs rename to core/tests/duckdb/mod.rs diff --git a/tests/flight/mod.rs b/core/tests/flight/mod.rs similarity index 100% rename from tests/flight/mod.rs rename to core/tests/flight/mod.rs diff --git a/tests/integration.rs b/core/tests/integration.rs similarity index 100% rename from tests/integration.rs rename to core/tests/integration.rs diff --git a/tests/mysql/common.rs b/core/tests/mysql/common.rs similarity index 100% rename from tests/mysql/common.rs rename to core/tests/mysql/common.rs diff --git a/tests/mysql/mod.rs b/core/tests/mysql/mod.rs similarity index 100% rename from tests/mysql/mod.rs rename to core/tests/mysql/mod.rs diff --git a/tests/postgres/common.rs b/core/tests/postgres/common.rs similarity index 100% rename from tests/postgres/common.rs rename to core/tests/postgres/common.rs diff --git a/tests/postgres/mod.rs b/core/tests/postgres/mod.rs similarity index 100% rename from tests/postgres/mod.rs rename to core/tests/postgres/mod.rs diff --git a/tests/postgres/schema.rs b/core/tests/postgres/schema.rs similarity index 100% rename from tests/postgres/schema.rs rename to core/tests/postgres/schema.rs diff --git a/tests/postgres/scripts/complex_table.sql b/core/tests/postgres/scripts/complex_table.sql similarity index 100% rename from tests/postgres/scripts/complex_table.sql rename to core/tests/postgres/scripts/complex_table.sql diff --git a/tests/postgres/snapshots/integration__postgres__schema__postgres_schema_inference_complex_types.snap b/core/tests/postgres/snapshots/integration__postgres__schema__postgres_schema_inference_complex_types.snap similarity index 100% rename from tests/postgres/snapshots/integration__postgres__schema__postgres_schema_inference_complex_types.snap rename to core/tests/postgres/snapshots/integration__postgres__schema__postgres_schema_inference_complex_types.snap diff --git a/tests/sqlite/mod.rs b/core/tests/sqlite/mod.rs similarity index 100% rename from tests/sqlite/mod.rs rename to core/tests/sqlite/mod.rs diff --git a/python/.cargo/config.toml b/python/.cargo/config.toml new file mode 100644 index 00000000..91a099a6 --- /dev/null +++ b/python/.cargo/config.toml @@ -0,0 +1,12 @@ +[target.x86_64-apple-darwin] +rustflags = [ + "-C", "link-arg=-undefined", + "-C", "link-arg=dynamic_lookup", +] + +[target.aarch64-apple-darwin] +rustflags = [ + "-C", "link-arg=-undefined", + "-C", "link-arg=dynamic_lookup", +] + diff --git a/python/.gitignore b/python/.gitignore new file mode 100644 index 00000000..9ea5de3e --- /dev/null +++ b/python/.gitignore @@ -0,0 +1,26 @@ +/venv +.idea +.DS_Store +.vscode + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# Python dist ignore +dist + +# C extensions +*.so + +# Python dist +dist + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +.python-version +venv +.venv + diff --git a/python/Cargo.toml b/python/Cargo.toml new file mode 100644 index 00000000..9cc5ee89 --- /dev/null +++ b/python/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "datafusion-table-providers-python" +version = { workspace = true } +readme = { workspace = true } +edition = { workspace = true } +repository = { workspace = true } +license = { workspace = true } +description = { workspace = true } +publish = false + +[lib] +name = "datafusion_table_providers" +crate-type = ["cdylib"] +doc = false + +[dependencies] +arrow = { workspace = true } +datafusion = { workspace = true, features = ["pyarrow"] } +datafusion-ffi = { workspace = true } +datafusion-table-providers = { workspace = true, features = ["sqlite", "duckdb"] } +pyo3 = { version = "0.23" } +tokio = { version = "1.42", features = ["macros", "rt", "rt-multi-thread", "sync"] } +duckdb = { workspace = true } diff --git a/python/examples/duckdb_demo.py b/python/examples/duckdb_demo.py new file mode 100644 index 00000000..49546427 --- /dev/null +++ b/python/examples/duckdb_demo.py @@ -0,0 +1,11 @@ +from datafusion import SessionContext +from datafusion_table_providers import duckdb + +ctx = SessionContext() +pool = duckdb.DuckDBTableFactory("../../core/examples/duckdb_example.db", duckdb.AccessMode.ReadOnly) +tables = pool.tables() + +for t in tables: + ctx.register_table_provider(t, pool.get_table(t)) + print("Checking table:", t) + ctx.table(t).show() diff --git a/python/examples/sqlite_demo.py b/python/examples/sqlite_demo.py new file mode 100644 index 00000000..fd1b376b --- /dev/null +++ b/python/examples/sqlite_demo.py @@ -0,0 +1,11 @@ +from datafusion import SessionContext +from datafusion_table_providers import sqlite + +ctx = SessionContext() +pool = sqlite.SqliteTableFactory("../../core/examples/sqlite_example.db", "file", 3.0, None) +tables = pool.tables() + +for t in tables: + ctx.register_table_provider(t, pool.get_table(t)) + print("Checking table:", t) + ctx.table(t).show() diff --git a/python/pyproject.toml b/python/pyproject.toml new file mode 100644 index 00000000..66c5032d --- /dev/null +++ b/python/pyproject.toml @@ -0,0 +1,59 @@ +[build-system] +requires = ["maturin>=1.5.1,<1.6.0"] +build-backend = "[maturin]" + +[project] +name = "datafusion_table_providers" +version = "0.1.0" +description = "Build and run queries against data" +readme = "../README.md" +license = { file = "../LICENSE" } +requires-python = ">=3.9" +keywords = ["datafusion", "dataframe", "rust", "query-engine"] +classifier = [ + "Development Status :: 2 - Pre-Alpha", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "License :: OSI Approved", + "Operating System :: MacOS", + "Operating System :: Microsoft :: Windows", + "Operating System :: POSIX :: Linux", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python", + "Programming Language :: Rust", +] +dependencies = ["datafusion>=45.0.0"] + +[project.urls] +repository = "https://github.com/datafusion-contrib/datafusion-table-providers" + +[tool.isort] +profile = "black" + +[tool.maturin] +python-source = "python" +module-name = "datafusion_table_providers._internal" +include = [{ path = "../Cargo.lock", format = "sdist" }] +exclude = [".github/**", "ci/**", ".asf.yaml"] +# Require Cargo.lock is up to date +locked = true + +# Enable docstring linting using the google style guide +[tool.ruff.lint] +select = ["E4", "E7", "E9", "F", "D", "W"] + +[tool.ruff.lint.pydocstyle] +convention = "google" + +[tool.ruff.lint.pycodestyle] +max-doc-length = 88 + +# Disable docstring checking for these directories +[tool.ruff.lint.per-file-ignores] +"python/tests/*" = ["D"] +"examples/*" = ["D", "W505"] +"dev/*" = ["D"] +"benchmarks/*" = ["D", "F"] +"docs/*" = ["D"] diff --git a/python/python/datafusion_table_providers/__init__.py b/python/python/datafusion_table_providers/__init__.py new file mode 100644 index 00000000..02cc74ad --- /dev/null +++ b/python/python/datafusion_table_providers/__init__.py @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Python package for datafusion table provider.""" diff --git a/python/python/datafusion_table_providers/duckdb.py b/python/python/datafusion_table_providers/duckdb.py new file mode 100644 index 00000000..96228c55 --- /dev/null +++ b/python/python/datafusion_table_providers/duckdb.py @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Python interface for DuckDB table provider.""" + +from typing import Any, List +from . import _internal +from enum import Enum + +class AccessMode(Enum): + """Python equivalent of rust duckdb::AccessMode Enum.""" + Automatic = "AUTOMATIC" + ReadOnly = "READ_ONLY" + ReadWrite = "READ_WRITE" + +class DuckDBTableFactory: + """DuckDB table factory.""" + + def __init__(self, path: str, access_mode: AccessMode = AccessMode.Automatic) -> None: + """Create a DuckDB table factory. + + If creating an in-memory table factory, then specify path to be :memory: or none + and don't specify access_mode. If creating a file-based table factory, then + specify path and access_mode. + + Args: + path: Memory or file location + access_mode: Access mode configuration + """ + # TODO: think about the interface, restrict invalid combination of input + # arguments, for example, if path is memory, then access_mode should not be + # specified. + if path == ":memory:" or path == "": + self._raw = _internal.duckdb.RawDuckDBTableFactory.new_memory() + else: + self._raw = _internal.duckdb.RawDuckDBTableFactory.new_file(path, str(access_mode)) + + def tables(self) -> List[str]: + """Get all the table names.""" + return self._raw.tables() + + def get_table(self, table_reference: str) -> Any: + """Return table provider for the table named `table_reference`. + + Args: + table_reference (str): table name + """ + return self._raw.get_table(table_reference) diff --git a/python/python/datafusion_table_providers/sqlite.py b/python/python/datafusion_table_providers/sqlite.py new file mode 100644 index 00000000..845ec823 --- /dev/null +++ b/python/python/datafusion_table_providers/sqlite.py @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Python interface for sqlite table provider.""" + +from typing import Any, List, Optional +from . import _internal + +class SqliteTableFactory: + """Sqlite table factory.""" + + def __init__(self, path: str, mode: str, busy_timeout_s: float, attach_databases: Optional[List[str]] = None) -> None: + """Create a sqlite table factory.""" + self._raw = _internal.sqlite.RawSqliteTableFactory(path, mode, busy_timeout_s, attach_databases) + + def tables(self) -> List[str]: + """Get all the table names.""" + return self._raw.tables() + + def get_table(self, table_reference: str) -> Any: + """Return the table provider for table named `table_reference`. + + Args: + table_reference (str): table name + """ + return self._raw.get_table(table_reference) diff --git a/python/src/duckdb.rs b/python/src/duckdb.rs new file mode 100644 index 00000000..41efbaf3 --- /dev/null +++ b/python/src/duckdb.rs @@ -0,0 +1,80 @@ +use std::{str::FromStr, sync::Arc}; + +use datafusion_table_providers::{ + duckdb::DuckDBTableFactory, + sql::db_connection_pool::{duckdbpool::DuckDbConnectionPool, DbConnectionPool}, +}; +use duckdb::AccessMode; +use pyo3::prelude::*; + +use crate::{ + utils::{to_pyerr, wait_for_future}, + RawTableProvider, +}; + +#[pyclass(module = "datafusion_table_providers._internal.duckdb")] +struct RawDuckDBTableFactory { + pool: Arc, + factory: DuckDBTableFactory, +} + +#[pymethods] +impl RawDuckDBTableFactory { + #[staticmethod] + #[pyo3(signature = ())] + pub fn new_memory() -> PyResult { + let pool = Arc::new(DuckDbConnectionPool::new_memory().map_err(to_pyerr)?); + + Ok(Self { + factory: DuckDBTableFactory::new(Arc::clone(&pool)), + pool, + }) + } + + #[staticmethod] + #[pyo3(signature = (path, access_mode))] + pub fn new_file(path: &str, access_mode: &str) -> PyResult { + let access_mode = AccessMode::from_str(access_mode).map_err(to_pyerr)?; + let pool = Arc::new(DuckDbConnectionPool::new_file(path, &access_mode).map_err(to_pyerr)?); + + Ok(Self { + factory: DuckDBTableFactory::new(Arc::clone(&pool)), + pool, + }) + } + + pub fn tables(&self, py: Python) -> PyResult> { + wait_for_future(py, async { + let conn = self.pool.connect().await.map_err(to_pyerr)?; + + let conn_sync = conn + .as_sync() + .ok_or(to_pyerr("Unable to create synchronous DuckDB connection"))?; + let schemas = conn_sync.schemas().map_err(to_pyerr)?; + + let mut tables = Vec::default(); + for schema in schemas { + let schema_tables = conn_sync.tables(&schema).map_err(to_pyerr)?; + tables.extend(schema_tables); + } + + Ok(tables) + }) + } + + pub fn get_table(&self, py: Python, table_reference: &str) -> PyResult { + let table = wait_for_future(py, self.factory.table_provider(table_reference.into())) + .map_err(to_pyerr)?; + + Ok(RawTableProvider { + table, + supports_pushdown_filters: true, + }) + } +} + +pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { + m.add_class::()?; + + Ok(()) +} diff --git a/python/src/flight.rs b/python/src/flight.rs new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/python/src/flight.rs @@ -0,0 +1 @@ + diff --git a/python/src/lib.rs b/python/src/lib.rs new file mode 100644 index 00000000..86b663f0 --- /dev/null +++ b/python/src/lib.rs @@ -0,0 +1,53 @@ +use std::{ffi::CString, sync::Arc}; + +use datafusion::catalog::TableProvider; +use datafusion_ffi::table_provider::FFI_TableProvider; +use pyo3::{prelude::*, types::PyCapsule}; + +#[pyclass(module = "datafusion_table_providers._internal")] +struct RawTableProvider { + pub(crate) table: Arc, + pub(crate) supports_pushdown_filters: bool, +} + +#[pymethods] +impl RawTableProvider { + fn __datafusion_table_provider__<'py>( + &self, + py: Python<'py>, + ) -> PyResult> { + let name = CString::new("datafusion_table_provider").unwrap(); + + let provider = FFI_TableProvider::new( + Arc::clone(&self.table), + self.supports_pushdown_filters, + None, + ); + + PyCapsule::new(py, provider, Some(name.clone())) + } +} + +pub mod duckdb; +pub mod flight; +pub mod mysql; +pub mod odbc; +pub mod postgres; +pub mod sqlite; +pub mod utils; + +#[pymodule] +// module name need to match project name +fn _internal(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { + m.add_class::()?; + + let sqlite = PyModule::new(py, "sqlite")?; + sqlite::init_module(&sqlite)?; + m.add_submodule(&sqlite)?; + + let duckdb = PyModule::new(py, "duckdb")?; + duckdb::init_module(&duckdb)?; + m.add_submodule(&duckdb)?; + + Ok(()) +} diff --git a/python/src/mysql.rs b/python/src/mysql.rs new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/python/src/mysql.rs @@ -0,0 +1 @@ + diff --git a/python/src/odbc.rs b/python/src/odbc.rs new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/python/src/odbc.rs @@ -0,0 +1 @@ + diff --git a/python/src/postgres.rs b/python/src/postgres.rs new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/python/src/postgres.rs @@ -0,0 +1 @@ + diff --git a/python/src/sqlite.rs b/python/src/sqlite.rs new file mode 100644 index 00000000..c91c6e3f --- /dev/null +++ b/python/src/sqlite.rs @@ -0,0 +1,80 @@ +use std::{sync::Arc, time::Duration}; + +use datafusion_table_providers::{ + sql::db_connection_pool::{ + sqlitepool::{SqliteConnectionPool, SqliteConnectionPoolFactory}, + DbConnectionPool, + }, + sqlite::SqliteTableFactory, +}; +use pyo3::prelude::*; + +use crate::{ + utils::{to_pyerr, wait_for_future}, + RawTableProvider, +}; + +#[pyclass(module = "datafusion_table_providers._internal.sqlite")] +struct RawSqliteTableFactory { + pool: Arc, + factory: SqliteTableFactory, +} + +#[pymethods] +impl RawSqliteTableFactory { + #[new] + #[pyo3(signature = (path, mode, busy_timeout_s, attach_databases = None))] + pub fn new( + py: Python, + path: &str, + mode: String, + busy_timeout_s: f64, + attach_databases: Option>, + ) -> PyResult { + let mode = mode.as_str().into(); + let busy_timeout = Duration::from_secs_f64(busy_timeout_s); + let attach_databases = attach_databases.map(|d| d.into_iter().map(Arc::from).collect()); + let factory = SqliteConnectionPoolFactory::new(path, mode, busy_timeout) + .with_databases(attach_databases); + let pool = Arc::new(wait_for_future(py, factory.build()).map_err(to_pyerr)?); + + Ok(Self { + factory: SqliteTableFactory::new(Arc::clone(&pool)), + pool, + }) + } + + pub fn tables(&self, py: Python) -> PyResult> { + wait_for_future(py, async { + let conn = self.pool.connect().await.map_err(to_pyerr)?; + let conn_async = conn.as_async().ok_or(to_pyerr( + "Unable to create connection to sqlite db".to_string(), + ))?; + let schemas = conn_async.schemas().await.map_err(to_pyerr)?; + + let mut tables = Vec::default(); + for schema in schemas { + let schema_tables = conn_async.tables(&schema).await.map_err(to_pyerr)?; + tables.extend(schema_tables); + } + + Ok(tables) + }) + } + + pub fn get_table(&self, py: Python, table_reference: &str) -> PyResult { + let table = wait_for_future(py, self.factory.table_provider(table_reference.into())) + .map_err(to_pyerr)?; + + Ok(RawTableProvider { + table, + supports_pushdown_filters: true, + }) + } +} + +pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { + m.add_class::()?; + + Ok(()) +} diff --git a/python/src/utils.rs b/python/src/utils.rs new file mode 100644 index 00000000..48c6893a --- /dev/null +++ b/python/src/utils.rs @@ -0,0 +1,24 @@ +use pyo3::{exceptions::PyException, prelude::*}; +use std::{future::Future, sync::OnceLock}; + +pub(crate) struct TokioRuntime(tokio::runtime::Runtime); + +#[inline] +pub(crate) fn get_tokio_runtime() -> &'static TokioRuntime { + static RUNTIME: OnceLock = OnceLock::new(); + RUNTIME.get_or_init(|| TokioRuntime(tokio::runtime::Runtime::new().unwrap())) +} + +/// Utility to collect rust futures with GIL released +pub fn wait_for_future(py: Python, f: F) -> F::Output +where + F: Future + Send, + F::Output: Send, +{ + let runtime: &tokio::runtime::Runtime = &get_tokio_runtime().0; + py.allow_threads(|| runtime.block_on(f)) +} + +pub fn to_pyerr(err: T) -> PyErr { + PyException::new_err(err.to_string()) +}