From 0dd45d7a456495b1019a3b3a5e50c730e95a8ef2 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Thu, 26 Dec 2024 10:11:26 -0500 Subject: [PATCH 01/19] Add optional reference to tokio runtime for table providers --- datafusion/ffi/Cargo.toml | 2 +- datafusion/ffi/src/execution_plan.rs | 30 ++++++++++++++++++----- datafusion/ffi/src/record_batch_stream.rs | 28 ++++++++++++++++++--- datafusion/ffi/src/table_provider.rs | 16 +++++++++--- 4 files changed, 62 insertions(+), 14 deletions(-) diff --git a/datafusion/ffi/Cargo.toml b/datafusion/ffi/Cargo.toml index a0179ec44d7f..c1ca69bb034e 100644 --- a/datafusion/ffi/Cargo.toml +++ b/datafusion/ffi/Cargo.toml @@ -44,7 +44,7 @@ datafusion-proto = { workspace = true } futures = { workspace = true } log = { workspace = true } prost = { workspace = true } +tokio = { workspace = true } [dev-dependencies] doc-comment = { workspace = true } -tokio = { workspace = true } diff --git a/datafusion/ffi/src/execution_plan.rs b/datafusion/ffi/src/execution_plan.rs index 5ab321cc0114..52f7a99ea41a 100644 --- a/datafusion/ffi/src/execution_plan.rs +++ b/datafusion/ffi/src/execution_plan.rs @@ -27,6 +27,7 @@ use datafusion::{ execution::{SendableRecordBatchStream, TaskContext}, physical_plan::{DisplayAs, ExecutionPlan, PlanProperties}, }; +use tokio::runtime::Runtime; use crate::{ plan_properties::FFI_PlanProperties, record_batch_stream::FFI_RecordBatchStream, @@ -71,6 +72,7 @@ unsafe impl Sync for FFI_ExecutionPlan {} pub struct ExecutionPlanPrivateData { pub plan: Arc, pub context: Arc, + pub runtime: Option>, } unsafe extern "C" fn properties_fn_wrapper( @@ -88,11 +90,14 @@ unsafe extern "C" fn children_fn_wrapper( let private_data = plan.private_data as *const ExecutionPlanPrivateData; let plan = &(*private_data).plan; let ctx = &(*private_data).context; + let runtime = &(*private_data).runtime; let children: Vec<_> = plan .children() .into_iter() - .map(|child| FFI_ExecutionPlan::new(Arc::clone(child), Arc::clone(ctx))) + .map(|child| { + FFI_ExecutionPlan::new(Arc::clone(child), Arc::clone(ctx), runtime.clone()) + }) .collect(); children.into() @@ -105,9 +110,10 @@ unsafe extern "C" fn execute_fn_wrapper( let private_data = plan.private_data as *const ExecutionPlanPrivateData; let plan = &(*private_data).plan; let ctx = &(*private_data).context; + let runtime = (*private_data).runtime.as_ref().map(|rt| Arc::clone(rt)); match plan.execute(partition, Arc::clone(ctx)) { - Ok(rbs) => RResult::ROk(rbs.into()), + Ok(rbs) => RResult::ROk(FFI_RecordBatchStream::new(rbs, runtime)), Err(e) => RResult::RErr( format!("Error occurred during FFI_ExecutionPlan execute: {}", e).into(), ), @@ -129,7 +135,11 @@ unsafe extern "C" fn clone_fn_wrapper(plan: &FFI_ExecutionPlan) -> FFI_Execution let private_data = plan.private_data as *const ExecutionPlanPrivateData; let plan_data = &(*private_data); - FFI_ExecutionPlan::new(Arc::clone(&plan_data.plan), Arc::clone(&plan_data.context)) + FFI_ExecutionPlan::new( + Arc::clone(&plan_data.plan), + Arc::clone(&plan_data.context), + plan_data.runtime.clone(), + ) } impl Clone for FFI_ExecutionPlan { @@ -140,8 +150,16 @@ impl Clone for FFI_ExecutionPlan { impl FFI_ExecutionPlan { /// This function is called on the provider's side. - pub fn new(plan: Arc, context: Arc) -> Self { - let private_data = Box::new(ExecutionPlanPrivateData { plan, context }); + pub fn new( + plan: Arc, + context: Arc, + runtime: Option>, + ) -> Self { + let private_data = Box::new(ExecutionPlanPrivateData { + plan, + context, + runtime, + }); Self { properties: properties_fn_wrapper, @@ -357,7 +375,7 @@ mod tests { let original_plan = Arc::new(EmptyExec::new(schema)); let original_name = original_plan.name().to_string(); - let local_plan = FFI_ExecutionPlan::new(original_plan, ctx.task_ctx()); + let local_plan = FFI_ExecutionPlan::new(original_plan, ctx.task_ctx(), None); let foreign_plan: ForeignExecutionPlan = (&local_plan).try_into()?; diff --git a/datafusion/ffi/src/record_batch_stream.rs b/datafusion/ffi/src/record_batch_stream.rs index c944e56c5cde..878ac24f6765 100644 --- a/datafusion/ffi/src/record_batch_stream.rs +++ b/datafusion/ffi/src/record_batch_stream.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::{ffi::c_void, task::Poll}; +use std::{ffi::c_void, sync::Arc, task::Poll}; use abi_stable::{ std_types::{ROption, RResult, RString}, @@ -33,6 +33,7 @@ use datafusion::{ execution::{RecordBatchStream, SendableRecordBatchStream}, }; use futures::{Stream, TryStreamExt}; +use tokio::runtime::Runtime; use crate::arrow_wrappers::{WrappedArray, WrappedSchema}; @@ -58,12 +59,27 @@ pub struct FFI_RecordBatchStream { pub private_data: *mut c_void, } +pub struct RecordBatchStreamPrivateData { + pub rbs: SendableRecordBatchStream, + pub runtime: Option>, +} + impl From for FFI_RecordBatchStream { fn from(stream: SendableRecordBatchStream) -> Self { + Self::new(stream, None) + } +} + +impl FFI_RecordBatchStream { + pub fn new(stream: SendableRecordBatchStream, runtime: Option>) -> Self { + let private_data = Box::into_raw(Box::new(RecordBatchStreamPrivateData { + rbs: stream, + runtime, + })) as *mut c_void; FFI_RecordBatchStream { poll_next: poll_next_fn_wrapper, schema: schema_fn_wrapper, - private_data: Box::into_raw(Box::new(stream)) as *mut c_void, + private_data, } } } @@ -71,7 +87,8 @@ impl From for FFI_RecordBatchStream { unsafe impl Send for FFI_RecordBatchStream {} unsafe extern "C" fn schema_fn_wrapper(stream: &FFI_RecordBatchStream) -> WrappedSchema { - let stream = stream.private_data as *const SendableRecordBatchStream; + let private_data = stream.private_data as *const RecordBatchStreamPrivateData; + let stream = &(*private_data).rbs; (*stream).schema().into() } @@ -106,7 +123,10 @@ unsafe extern "C" fn poll_next_fn_wrapper( stream: &FFI_RecordBatchStream, cx: &mut FfiContext, ) -> FfiPoll>> { - let stream = stream.private_data as *mut SendableRecordBatchStream; + let private_data = stream.private_data as *mut RecordBatchStreamPrivateData; + let stream = &mut (*private_data).rbs; + + let _guard = (*private_data).runtime.as_ref().map(|rt| rt.enter()); let poll_result = cx.with_context(|std_cx| { (*stream) diff --git a/datafusion/ffi/src/table_provider.rs b/datafusion/ffi/src/table_provider.rs index b229d908d10d..69ab69f6ef8c 100644 --- a/datafusion/ffi/src/table_provider.rs +++ b/datafusion/ffi/src/table_provider.rs @@ -40,6 +40,7 @@ use datafusion_proto::{ protobuf::LogicalExprList, }; use prost::Message; +use tokio::runtime::Runtime; use crate::{ arrow_wrappers::WrappedSchema, @@ -149,6 +150,7 @@ unsafe impl Sync for FFI_TableProvider {} struct ProviderPrivateData { provider: Arc, + runtime: Option>, } unsafe extern "C" fn schema_fn_wrapper(provider: &FFI_TableProvider) -> WrappedSchema { @@ -216,6 +218,7 @@ unsafe extern "C" fn scan_fn_wrapper( let private_data = provider.private_data as *mut ProviderPrivateData; let internal_provider = &(*private_data).provider; let session_config = session_config.clone(); + let runtime = &(*private_data).runtime; async move { let config = match ForeignSessionConfig::try_from(&session_config) { @@ -261,7 +264,11 @@ unsafe extern "C" fn scan_fn_wrapper( Err(e) => return RResult::RErr(e.to_string().into()), }; - RResult::ROk(FFI_ExecutionPlan::new(plan, ctx.task_ctx())) + RResult::ROk(FFI_ExecutionPlan::new( + plan, + ctx.task_ctx(), + runtime.clone(), + )) } .into_ffi() } @@ -273,9 +280,11 @@ unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_TableProvider) { unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_TableProvider) -> FFI_TableProvider { let old_private_data = provider.private_data as *const ProviderPrivateData; + let runtime = (*old_private_data).runtime.clone(); let private_data = Box::into_raw(Box::new(ProviderPrivateData { provider: Arc::clone(&(*old_private_data).provider), + runtime, })) as *mut c_void; FFI_TableProvider { @@ -300,8 +309,9 @@ impl FFI_TableProvider { pub fn new( provider: Arc, can_support_pushdown_filters: bool, + runtime: Option>, ) -> Self { - let private_data = Box::new(ProviderPrivateData { provider }); + let private_data = Box::new(ProviderPrivateData { provider, runtime }); Self { schema: schema_fn_wrapper, @@ -463,7 +473,7 @@ mod tests { let provider = Arc::new(MemTable::try_new(schema, vec![vec![batch1], vec![batch2]])?); - let ffi_provider = FFI_TableProvider::new(provider, true); + let ffi_provider = FFI_TableProvider::new(provider, true, None); let foreign_table_provider: ForeignTableProvider = (&ffi_provider).into(); From 4087f831015383997201969d6df7e9f20ed10381 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Fri, 27 Dec 2024 10:38:09 -0500 Subject: [PATCH 02/19] Add function to return the library version over FFI --- datafusion/ffi/Cargo.toml | 1 + datafusion/ffi/src/lib.rs | 9 +++++++++ datafusion/ffi/src/table_provider.rs | 5 +++++ 3 files changed, 15 insertions(+) diff --git a/datafusion/ffi/Cargo.toml b/datafusion/ffi/Cargo.toml index c1ca69bb034e..1550629edc43 100644 --- a/datafusion/ffi/Cargo.toml +++ b/datafusion/ffi/Cargo.toml @@ -44,6 +44,7 @@ datafusion-proto = { workspace = true } futures = { workspace = true } log = { workspace = true } prost = { workspace = true } +semver = "1.0.24" tokio = { workspace = true } [dev-dependencies] diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs index 8e09780edf03..7187de15474b 100644 --- a/datafusion/ffi/src/lib.rs +++ b/datafusion/ffi/src/lib.rs @@ -26,5 +26,14 @@ pub mod session_config; pub mod table_provider; pub mod table_source; +/// Returns the major version of the FFI implementation. If the API evolves, +/// we use the major version to identify compatibility over the unsafe +/// boundary. +pub unsafe extern "C" fn version() -> u64 { + let version_str = env!("CARGO_PKG_VERSION"); + let version = semver::Version::parse(version_str).expect("Invalid version string"); + version.major +} + #[cfg(doctest)] doc_comment::doctest!("../README.md", readme_example_test); diff --git a/datafusion/ffi/src/table_provider.rs b/datafusion/ffi/src/table_provider.rs index 69ab69f6ef8c..183dfc8755d1 100644 --- a/datafusion/ffi/src/table_provider.rs +++ b/datafusion/ffi/src/table_provider.rs @@ -140,6 +140,9 @@ pub struct FFI_TableProvider { /// Release the memory of the private data when it is no longer being used. pub release: unsafe extern "C" fn(arg: &mut Self), + /// Return the major DataFusion version number of this provider. + pub version: unsafe extern "C" fn() -> u64, + /// Internal data. This is only to be accessed by the provider of the plan. /// A [`ForeignExecutionPlan`] should never attempt to access this data. pub private_data: *mut c_void, @@ -294,6 +297,7 @@ unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_TableProvider) -> FFI_Table supports_filters_pushdown: provider.supports_filters_pushdown, clone: clone_fn_wrapper, release: release_fn_wrapper, + version: super::version, private_data, } } @@ -323,6 +327,7 @@ impl FFI_TableProvider { }, clone: clone_fn_wrapper, release: release_fn_wrapper, + version: super::version, private_data: Box::into_raw(private_data) as *mut c_void, } } From 5b085fa194c4ade90f8da62dcbb12552e8d66bb3 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Fri, 27 Dec 2024 10:49:21 -0500 Subject: [PATCH 03/19] Resolve clippy warnings --- .../examples/ffi/ffi_example_table_provider/src/lib.rs | 2 +- datafusion/ffi/src/execution_plan.rs | 2 +- datafusion/ffi/src/lib.rs | 4 ++++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/datafusion-examples/examples/ffi/ffi_example_table_provider/src/lib.rs b/datafusion-examples/examples/ffi/ffi_example_table_provider/src/lib.rs index c7eea8a8070b..c37d8f835ce8 100644 --- a/datafusion-examples/examples/ffi/ffi_example_table_provider/src/lib.rs +++ b/datafusion-examples/examples/ffi/ffi_example_table_provider/src/lib.rs @@ -53,7 +53,7 @@ extern "C" fn construct_simple_table_provider() -> FFI_TableProvider { let table_provider = MemTable::try_new(schema, vec![batches]).unwrap(); - FFI_TableProvider::new(Arc::new(table_provider), true) + FFI_TableProvider::new(Arc::new(table_provider), true, None) } #[export_root_module] diff --git a/datafusion/ffi/src/execution_plan.rs b/datafusion/ffi/src/execution_plan.rs index 52f7a99ea41a..a8c2f42fe251 100644 --- a/datafusion/ffi/src/execution_plan.rs +++ b/datafusion/ffi/src/execution_plan.rs @@ -110,7 +110,7 @@ unsafe extern "C" fn execute_fn_wrapper( let private_data = plan.private_data as *const ExecutionPlanPrivateData; let plan = &(*private_data).plan; let ctx = &(*private_data).context; - let runtime = (*private_data).runtime.as_ref().map(|rt| Arc::clone(rt)); + let runtime = (*private_data).runtime.as_ref().map(Arc::clone); match plan.execute(partition, Arc::clone(ctx)) { Ok(rbs) => RResult::ROk(FFI_RecordBatchStream::new(rbs, runtime)), diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs index 7187de15474b..5f9f0a12944d 100644 --- a/datafusion/ffi/src/lib.rs +++ b/datafusion/ffi/src/lib.rs @@ -29,6 +29,10 @@ pub mod table_source; /// Returns the major version of the FFI implementation. If the API evolves, /// we use the major version to identify compatibility over the unsafe /// boundary. +/// +/// # Safety +/// +/// In general this function should always be safe to call from external libraries. pub unsafe extern "C" fn version() -> u64 { let version_str = env!("CARGO_PKG_VERSION"); let version = semver::Version::parse(version_str).expect("Invalid version string"); From 766ec26fd69eb1d92dfe4f2fe7f3cccd531019ee Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Fri, 27 Dec 2024 11:17:51 -0500 Subject: [PATCH 04/19] Function does not need to be defined as unsafe --- datafusion/ffi/src/lib.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs index 5f9f0a12944d..f71cc16457a3 100644 --- a/datafusion/ffi/src/lib.rs +++ b/datafusion/ffi/src/lib.rs @@ -29,11 +29,7 @@ pub mod table_source; /// Returns the major version of the FFI implementation. If the API evolves, /// we use the major version to identify compatibility over the unsafe /// boundary. -/// -/// # Safety -/// -/// In general this function should always be safe to call from external libraries. -pub unsafe extern "C" fn version() -> u64 { +pub extern "C" fn version() -> u64 { let version_str = env!("CARGO_PKG_VERSION"); let version = semver::Version::parse(version_str).expect("Invalid version string"); version.major From f75ba367eb570893f21dadfb6488345bcdc40e90 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 29 Dec 2024 07:59:21 -0500 Subject: [PATCH 05/19] Add integration test for FFI table provider --- Cargo.toml | 1 + datafusion/ffitest/Cargo.toml | 47 ++++ datafusion/ffitest/src/async_provider.rs | 255 +++++++++++++++++++++ datafusion/ffitest/src/lib.rs | 96 ++++++++ datafusion/ffitest/src/sync_provider.rs | 22 ++ datafusion/ffitest/tests/table_provider.rs | 71 ++++++ 6 files changed, 492 insertions(+) create mode 100644 datafusion/ffitest/Cargo.toml create mode 100644 datafusion/ffitest/src/async_provider.rs create mode 100644 datafusion/ffitest/src/lib.rs create mode 100644 datafusion/ffitest/src/sync_provider.rs create mode 100644 datafusion/ffitest/tests/table_provider.rs diff --git a/Cargo.toml b/Cargo.toml index 1581c115f505..1c2900b1deb5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ members = [ "datafusion/expr-common", "datafusion/execution", "datafusion/ffi", + "datafusion/ffitest", "datafusion/functions", "datafusion/functions-aggregate", "datafusion/functions-aggregate-common", diff --git a/datafusion/ffitest/Cargo.toml b/datafusion/ffitest/Cargo.toml new file mode 100644 index 000000000000..e11e6aed49b1 --- /dev/null +++ b/datafusion/ffitest/Cargo.toml @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "datafusion-ffi-test" +description = "Integration tests for DataFusion FFI" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +license = { workspace = true } +authors = { workspace = true } +publish = false +# Specify MSRV here as `cargo msrv` doesn't support workspace version +rust-version = "1.80.1" + +[lints] +workspace = true + +[lib] +name = "datafusion_ffi_test" +crate-type = ["cdylib", "rlib"] + +[dependencies] +abi_stable = "0.11.3" +arrow = { workspace = true } +arrow-array = { workspace = true } +arrow-schema = { workspace = true } +async-trait = { workspace = true } +datafusion = { workspace = true, default-features = false } +datafusion-ffi = { workspace = true } +futures = { workspace = true } +tokio = { workspace = true } diff --git a/datafusion/ffitest/src/async_provider.rs b/datafusion/ffitest/src/async_provider.rs new file mode 100644 index 000000000000..24d0fcd2299e --- /dev/null +++ b/datafusion/ffitest/src/async_provider.rs @@ -0,0 +1,255 @@ +use std::{any::Any, fmt::Debug, sync::Arc}; + +use arrow_array::RecordBatch; +use arrow_schema::Schema; +use async_trait::async_trait; +use datafusion::{ + catalog::{Session, TableProvider}, + error::{DataFusionError, Result}, + execution::RecordBatchStream, + physical_expr::EquivalenceProperties, + physical_plan::{ExecutionPlan, Partitioning}, + prelude::Expr, +}; +use datafusion_ffi::table_provider::FFI_TableProvider; +use futures::Stream; +use tokio::{ + runtime::Runtime, + sync::{broadcast, mpsc}, +}; + +use crate::create_record_batch; + +#[derive(Debug)] +pub struct AsyncTableProvider { + batch_request: mpsc::Sender, + shutdown: mpsc::Sender, + batch_receiver: broadcast::Receiver>, + _join_handle: Option>, +} + +fn async_table_provider_task( + mut shutdown: mpsc::Receiver, + mut batch_request: mpsc::Receiver, + batch_sender: broadcast::Sender>, + tokio_rt: mpsc::Sender>, +) { + let runtime = Arc::new( + tokio::runtime::Builder::new_current_thread() + .build() + .expect("Unable to create tokio runtime"), + ); + let _runtime_guard = runtime.enter(); + tokio_rt + .blocking_send(Arc::clone(&runtime)) + .expect("Unable to send tokio runtime back to main thread"); + + runtime.block_on(async move { + let mut num_received = 0; + while let Some(true) = batch_request.recv().await { + let record_batch = match num_received { + 0 => Some(create_record_batch(1, 5)), + 1 => Some(create_record_batch(6, 1)), + 2 => Some(create_record_batch(7, 5)), + _ => None, + }; + num_received += 1; + + if batch_sender.send(record_batch).is_err() { + break; + } + } + }); + + let _ = shutdown.blocking_recv(); +} + +pub fn start_async_provider() -> (AsyncTableProvider, Arc) { + let (batch_request_tx, batch_request_rx) = mpsc::channel(10); + let (record_batch_tx, record_batch_rx) = broadcast::channel(10); + let (tokio_rt_tx, mut tokio_rt_rx) = mpsc::channel(10); + let (shutdown_tx, shutdown_rx) = mpsc::channel(1); + + // It is important that we are not using tokio to spawn here. We want this + // other thread to create it's own runtime, which is similar to a model used + // in datafusion-python and probably other places. This will let us test that + // we do correctly enter the runtime of the foreign provider. + let join_handle = Some(std::thread::spawn(move || { + async_table_provider_task( + shutdown_rx, + batch_request_rx, + record_batch_tx, + tokio_rt_tx, + ) + })); + + let tokio_rt = tokio_rt_rx + .blocking_recv() + .expect("Unable to receive tokio runtime from spawned thread"); + + let table_provider = AsyncTableProvider { + shutdown: shutdown_tx, + batch_request: batch_request_tx, + batch_receiver: record_batch_rx, + _join_handle: join_handle, + }; + + (table_provider, tokio_rt) +} + +#[async_trait] +impl TableProvider for AsyncTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> Arc { + super::create_test_schema() + } + + fn table_type(&self) -> datafusion::logical_expr::TableType { + datafusion::logical_expr::TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + _projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + Ok(Arc::new(AsyncTestExecutionPlan::new( + self.batch_request.clone(), + self.batch_receiver.resubscribe(), + ))) + } +} + +impl Drop for AsyncTableProvider { + fn drop(&mut self) { + self.shutdown + .blocking_send(false) + .expect("Unable to call shutdown on spawned thread.") + } +} + +#[derive(Debug)] +struct AsyncTestExecutionPlan { + properties: datafusion::physical_plan::PlanProperties, + batch_request: mpsc::Sender, + batch_receiver: broadcast::Receiver>, +} + +impl AsyncTestExecutionPlan { + pub fn new( + batch_request: mpsc::Sender, + batch_receiver: broadcast::Receiver>, + ) -> Self { + Self { + properties: datafusion::physical_plan::PlanProperties::new( + EquivalenceProperties::new(super::create_test_schema()), + Partitioning::UnknownPartitioning(3), + datafusion::physical_plan::execution_plan::EmissionType::Incremental, + datafusion::physical_plan::execution_plan::Boundedness::Bounded, + ), + batch_request, + batch_receiver, + } + } +} + +impl ExecutionPlan for AsyncTestExecutionPlan { + fn name(&self) -> &str { + "async test execution plan" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &datafusion::physical_plan::PlanProperties { + &self.properties + } + + fn children(&self) -> Vec<&Arc> { + Vec::default() + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> Result> { + Ok(self) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + Ok(Box::pin(AsyncTestRecordBatchStream { + batch_request: self.batch_request.clone(), + batch_receiver: self.batch_receiver.resubscribe(), + })) + } +} + +impl datafusion::physical_plan::DisplayAs for AsyncTestExecutionPlan { + fn fmt_as( + &self, + _t: datafusion::physical_plan::DisplayFormatType, + _f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + // Do nothing, just a test + Ok(()) + } +} + +struct AsyncTestRecordBatchStream { + batch_request: mpsc::Sender, + batch_receiver: broadcast::Receiver>, +} + +impl RecordBatchStream for AsyncTestRecordBatchStream { + fn schema(&self) -> arrow_schema::SchemaRef { + super::create_test_schema() + } +} + +impl Stream for AsyncTestRecordBatchStream { + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let mut this = self.as_mut(); + + #[allow(clippy::disallowed_methods)] + tokio::spawn(async move { + // Nothing to do. We just need to simulate an async + // task running + }); + + if let Err(e) = this.batch_request.try_send(true) { + return std::task::Poll::Ready(Some(Err(DataFusionError::Execution( + format!("Unable to send batch request, {}", e), + )))); + } + + match this.batch_receiver.blocking_recv() { + Ok(batch) => match batch { + Some(batch) => std::task::Poll::Ready(Some(Ok(batch))), + None => std::task::Poll::Ready(None), + }, + Err(e) => std::task::Poll::Ready(Some(Err(DataFusionError::Execution( + format!("Unable receive record batch: {}", e), + )))), + } + } +} + +pub(crate) fn create_async_table_provider() -> FFI_TableProvider { + let (table_provider, tokio_rt) = start_async_provider(); + FFI_TableProvider::new(Arc::new(table_provider), true, Some(tokio_rt)) +} diff --git a/datafusion/ffitest/src/lib.rs b/datafusion/ffitest/src/lib.rs new file mode 100644 index 000000000000..2c6e54230247 --- /dev/null +++ b/datafusion/ffitest/src/lib.rs @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use abi_stable::{ + declare_root_module_statics, export_root_module, + library::{LibraryError, RootModule}, + package_version_strings, + prefix_type::PrefixTypeTrait, + sabi_types::VersionStrings, + StableAbi, +}; + +use arrow_array::RecordBatch; +use async_provider::create_async_table_provider; +use datafusion::{ + arrow::datatypes::{DataType, Field, Schema}, + common::record_batch, +}; +use datafusion_ffi::table_provider::FFI_TableProvider; +use sync_provider::create_sync_table_provider; + +mod async_provider; +mod sync_provider; + +#[repr(C)] +#[derive(StableAbi)] +#[sabi(kind(Prefix(prefix_ref = TableProviderModuleRef)))] +/// This struct defines the module interfaces. It is to be shared by +/// both the module loading program and library that implements the +/// module. It is possible to move this definition into the loading +/// program and reference it in the modules, but this example shows +/// how a user may wish to separate these concerns. +pub struct TableProviderModule { + /// Constructs the table provider + pub create_table: extern "C" fn(synchronous: bool) -> FFI_TableProvider, +} + +impl RootModule for TableProviderModuleRef { + declare_root_module_statics! {TableProviderModuleRef} + const BASE_NAME: &'static str = "datafusion_ffi_test"; + const NAME: &'static str = "datafusion_ffi_test"; + const VERSION_STRINGS: VersionStrings = package_version_strings!(); + + fn initialization(self) -> Result { + Ok(self) + } +} + +fn create_test_schema() -> Arc { + Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Float64, true), + ])) +} + +pub fn create_record_batch(start_value: i32, num_values: usize) -> RecordBatch { + let end_value = start_value + num_values as i32; + let a_vals: Vec = (start_value..end_value).collect(); + let b_vals: Vec = a_vals.iter().map(|v| *v as f64).collect(); + + record_batch!(("a", Int32, a_vals), ("b", Float64, b_vals)).unwrap() +} + +/// Here we only wish to create a simple table provider as an example. +/// We create an in-memory table and convert it to it's FFI counterpart. +extern "C" fn construct_table_provider(synchronous: bool) -> FFI_TableProvider { + match synchronous { + true => create_sync_table_provider(), + false => create_async_table_provider(), + } +} + +#[export_root_module] +/// This defines the entry point for using the module. +pub fn get_simple_memory_table() -> TableProviderModuleRef { + TableProviderModule { + create_table: construct_table_provider, + } + .leak_into_prefix() +} diff --git a/datafusion/ffitest/src/sync_provider.rs b/datafusion/ffitest/src/sync_provider.rs new file mode 100644 index 000000000000..46ce0314b94a --- /dev/null +++ b/datafusion/ffitest/src/sync_provider.rs @@ -0,0 +1,22 @@ +use std::sync::Arc; + +use datafusion::datasource::MemTable; +use datafusion_ffi::table_provider::FFI_TableProvider; + +use crate::{create_record_batch, create_test_schema}; + +pub(crate) fn create_sync_table_provider() -> FFI_TableProvider { + let schema = create_test_schema(); + + // It is useful to create these as multiple record batches + // so that we can demonstrate the FFI stream. + let batches = vec![ + create_record_batch(1, 5), + create_record_batch(6, 1), + create_record_batch(7, 5), + ]; + + let table_provider = MemTable::try_new(schema, vec![batches]).unwrap(); + + FFI_TableProvider::new(Arc::new(table_provider), true, None) +} diff --git a/datafusion/ffitest/tests/table_provider.rs b/datafusion/ffitest/tests/table_provider.rs new file mode 100644 index 000000000000..a5fdc854036b --- /dev/null +++ b/datafusion/ffitest/tests/table_provider.rs @@ -0,0 +1,71 @@ +use abi_stable::library::development_utils::compute_library_path; +use abi_stable::library::RootModule; +use datafusion::error::{DataFusionError, Result}; +use datafusion::prelude::SessionContext; +use datafusion_ffi::table_provider::ForeignTableProvider; +use datafusion_ffi_test::TableProviderModuleRef; +use std::path::Path; +use std::sync::Arc; + +/// It is important that this test is in the `tests` directory and not in the +/// library directory so we can verify we are building a dynamic library and +/// testing it via a different executable. +async fn test_table_provider(synchronous: bool) -> Result<()> { + let crate_root = Path::new(env!("CARGO_MANIFEST_DIR")); + let target_dir = crate_root + .parent() + .expect("Failed to find crate parent") + .parent() + .expect("Failed to find workspace root") + .join("target"); + + // Find the location of the library. This is specific to the build environment, + // so you will need to change the approach here based on your use case. + // let target: &std::path::Path = "../../../../target/".as_ref(); + let library_path = + compute_library_path::(target_dir.as_path()) + .map_err(|e| DataFusionError::External(Box::new(e)))? + .join("deps"); + + // Load the module + let table_provider_module = + TableProviderModuleRef::load_from_directory(&library_path) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + // By calling the code below, the table provided will be created within + // the module's code. + let ffi_table_provider = + table_provider_module + .create_table() + .ok_or(DataFusionError::NotImplemented( + "External table provider failed to implement create_table".to_string(), + ))?(synchronous); + + // In order to access the table provider within this executable, we need to + // turn it into a `ForeignTableProvider`. + let foreign_table_provider: ForeignTableProvider = (&ffi_table_provider).into(); + + let ctx = SessionContext::new(); + + // Display the data to show the full cycle works. + ctx.register_table("external_table", Arc::new(foreign_table_provider))?; + let df = ctx.table("external_table").await?; + let results = df.collect().await?; + + assert_eq!(results.len(), 3); + assert_eq!(results[0], datafusion_ffi_test::create_record_batch(1, 5)); + assert_eq!(results[1], datafusion_ffi_test::create_record_batch(6, 1)); + assert_eq!(results[2], datafusion_ffi_test::create_record_batch(7, 5)); + + Ok(()) +} + +#[tokio::test] +async fn async_test_table_provider() -> Result<()> { + test_table_provider(false).await +} + +#[tokio::test] +async fn sync_test_table_provider() -> Result<()> { + test_table_provider(true).await +} From f62d5bdf26fc751060d4ff794238c9b642072478 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 29 Dec 2024 08:08:44 -0500 Subject: [PATCH 06/19] Add version call on FFI integration test --- datafusion/ffitest/src/lib.rs | 3 +++ datafusion/ffitest/tests/table_provider.rs | 9 +++++++++ 2 files changed, 12 insertions(+) diff --git a/datafusion/ffitest/src/lib.rs b/datafusion/ffitest/src/lib.rs index 2c6e54230247..0239aa9b6cb8 100644 --- a/datafusion/ffitest/src/lib.rs +++ b/datafusion/ffitest/src/lib.rs @@ -49,6 +49,8 @@ mod sync_provider; pub struct TableProviderModule { /// Constructs the table provider pub create_table: extern "C" fn(synchronous: bool) -> FFI_TableProvider, + + pub version: extern "C" fn() -> u64, } impl RootModule for TableProviderModuleRef { @@ -91,6 +93,7 @@ extern "C" fn construct_table_provider(synchronous: bool) -> FFI_TableProvider { pub fn get_simple_memory_table() -> TableProviderModuleRef { TableProviderModule { create_table: construct_table_provider, + version: datafusion_ffi::version, } .leak_into_prefix() } diff --git a/datafusion/ffitest/tests/table_provider.rs b/datafusion/ffitest/tests/table_provider.rs index a5fdc854036b..393ecfdaed0f 100644 --- a/datafusion/ffitest/tests/table_provider.rs +++ b/datafusion/ffitest/tests/table_provider.rs @@ -11,6 +11,8 @@ use std::sync::Arc; /// library directory so we can verify we are building a dynamic library and /// testing it via a different executable. async fn test_table_provider(synchronous: bool) -> Result<()> { + let expected_version = datafusion_ffi::version(); + let crate_root = Path::new(env!("CARGO_MANIFEST_DIR")); let target_dir = crate_root .parent() @@ -32,6 +34,13 @@ async fn test_table_provider(synchronous: bool) -> Result<()> { TableProviderModuleRef::load_from_directory(&library_path) .map_err(|e| DataFusionError::External(Box::new(e)))?; + assert_eq!( + table_provider_module + .version() + .expect("Unable to call version on FFI module")(), + expected_version + ); + // By calling the code below, the table provided will be created within // the module's code. let ffi_table_provider = From 8f66b4ea076fb4cb06c0e6625d9a00e12f5d9f5f Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 29 Dec 2024 10:21:29 -0500 Subject: [PATCH 07/19] Making use explicit on crate to try to get CI to ensure it builds first --- datafusion/ffitest/tests/table_provider.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/ffitest/tests/table_provider.rs b/datafusion/ffitest/tests/table_provider.rs index 393ecfdaed0f..856394b8f4f9 100644 --- a/datafusion/ffitest/tests/table_provider.rs +++ b/datafusion/ffitest/tests/table_provider.rs @@ -3,6 +3,8 @@ use abi_stable::library::RootModule; use datafusion::error::{DataFusionError, Result}; use datafusion::prelude::SessionContext; use datafusion_ffi::table_provider::ForeignTableProvider; +#[allow(clippy::single_component_path_imports)] +use datafusion_ffi_test; use datafusion_ffi_test::TableProviderModuleRef; use std::path::Path; use std::sync::Arc; From 883ec4872092f8b11c143142795fb6bd51d587b5 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 29 Dec 2024 10:24:25 -0500 Subject: [PATCH 08/19] Add license text --- datafusion/ffitest/src/async_provider.rs | 17 +++++++++++++++++ datafusion/ffitest/src/sync_provider.rs | 17 +++++++++++++++++ datafusion/ffitest/tests/table_provider.rs | 17 +++++++++++++++++ 3 files changed, 51 insertions(+) diff --git a/datafusion/ffitest/src/async_provider.rs b/datafusion/ffitest/src/async_provider.rs index 24d0fcd2299e..4c190df5c051 100644 --- a/datafusion/ffitest/src/async_provider.rs +++ b/datafusion/ffitest/src/async_provider.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::{any::Any, fmt::Debug, sync::Arc}; use arrow_array::RecordBatch; diff --git a/datafusion/ffitest/src/sync_provider.rs b/datafusion/ffitest/src/sync_provider.rs index 46ce0314b94a..3c0514cd501c 100644 --- a/datafusion/ffitest/src/sync_provider.rs +++ b/datafusion/ffitest/src/sync_provider.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::sync::Arc; use datafusion::datasource::MemTable; diff --git a/datafusion/ffitest/tests/table_provider.rs b/datafusion/ffitest/tests/table_provider.rs index 856394b8f4f9..815743e32cfa 100644 --- a/datafusion/ffitest/tests/table_provider.rs +++ b/datafusion/ffitest/tests/table_provider.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use abi_stable::library::development_utils::compute_library_path; use abi_stable::library::RootModule; use datafusion::error::{DataFusionError, Result}; From ef5e8b67f939eccd3e353f57a83e687bf6f5228d Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Thu, 16 Jan 2025 18:49:41 -0500 Subject: [PATCH 09/19] Fix unit test to find deps in ci profile --- datafusion/ffitest/tests/table_provider.rs | 34 +++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/datafusion/ffitest/tests/table_provider.rs b/datafusion/ffitest/tests/table_provider.rs index 815743e32cfa..8763e532c8be 100644 --- a/datafusion/ffitest/tests/table_provider.rs +++ b/datafusion/ffitest/tests/table_provider.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use abi_stable::library::development_utils::compute_library_path; use abi_stable::library::RootModule; use datafusion::error::{DataFusionError, Result}; use datafusion::prelude::SessionContext; @@ -26,6 +25,39 @@ use datafusion_ffi_test::TableProviderModuleRef; use std::path::Path; use std::sync::Arc; +/// Compute the path to the library. It would be preferable to simply use +/// abi_stable::library::development_utils::compute_library_path however +/// our current CI pipeline has a `ci` profile that we need to use to +/// find the library. +pub fn compute_library_path( + target_path: &Path, +) -> std::io::Result { + let debug_dir = target_path.join("debug"); + let release_dir = target_path.join("release"); + let ci_dir = target_path.join("ci"); + + let debug_path = M::get_library_path(&debug_dir.join("deps")); + let release_path = M::get_library_path(&release_dir.join("deps")); + let ci_path = M::get_library_path(&ci_dir.join("deps")); + + let all_paths = vec![ + (debug_dir.clone(), debug_path), + (release_dir, release_path), + (ci_dir, ci_path), + ]; + + let best_path = all_paths + .into_iter() + .filter(|(_, path)| path.exists()) + .filter_map(|(dir, path)| path.metadata().map(|m| (dir, m)).ok()) + .filter_map(|(dir, meta)| meta.modified().map(|m| (dir, m)).ok()) + .max_by_key(|(_, date)| *date) + .map(|(dir, _)| dir) + .unwrap_or(debug_dir); + + Ok(best_path) +} + /// It is important that this test is in the `tests` directory and not in the /// library directory so we can verify we are building a dynamic library and /// testing it via a different executable. From ccab80c8b01701008f1b04a7079f7703505f4fa9 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 25 Jan 2025 13:11:35 -0500 Subject: [PATCH 10/19] Remove ffitest crate and put test lib behind a feature flag --- Cargo.toml | 1 - ci/scripts/rust_clippy.sh | 2 +- datafusion/ffi/Cargo.toml | 6 +++ datafusion/ffi/src/lib.rs | 3 ++ .../src => ffi/src/tests}/async_provider.rs | 4 +- .../src/lib.rs => ffi/src/tests/mod.rs} | 8 ++-- .../src => ffi/src/tests}/sync_provider.rs | 4 +- .../{ffitest => ffi}/tests/table_provider.rs | 10 ++-- datafusion/ffitest/Cargo.toml | 47 ------------------- 9 files changed, 22 insertions(+), 63 deletions(-) rename datafusion/{ffitest/src => ffi/src/tests}/async_provider.rs (98%) rename datafusion/{ffitest/src/lib.rs => ffi/src/tests/mod.rs} (94%) rename datafusion/{ffitest/src => ffi/src/tests}/sync_provider.rs (92%) rename datafusion/{ffitest => ffi}/tests/table_provider.rs (93%) delete mode 100644 datafusion/ffitest/Cargo.toml diff --git a/Cargo.toml b/Cargo.toml index 1c2900b1deb5..1581c115f505 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,6 @@ members = [ "datafusion/expr-common", "datafusion/execution", "datafusion/ffi", - "datafusion/ffitest", "datafusion/functions", "datafusion/functions-aggregate", "datafusion/functions-aggregate-common", diff --git a/ci/scripts/rust_clippy.sh b/ci/scripts/rust_clippy.sh index f5c8b61e1c06..01eb6e710a2d 100755 --- a/ci/scripts/rust_clippy.sh +++ b/ci/scripts/rust_clippy.sh @@ -18,6 +18,6 @@ # under the License. set -ex -cargo clippy --all-targets --workspace --features avro,pyarrow -- -D warnings +cargo clippy --all-targets --workspace --features avro,pyarrow,integration-tests -- -D warnings cd datafusion-cli cargo clippy --all-targets --all-features -- -D warnings diff --git a/datafusion/ffi/Cargo.toml b/datafusion/ffi/Cargo.toml index 1550629edc43..1a6248322d1c 100644 --- a/datafusion/ffi/Cargo.toml +++ b/datafusion/ffi/Cargo.toml @@ -33,10 +33,13 @@ workspace = true [lib] name = "datafusion_ffi" path = "src/lib.rs" +crate-type = ["cdylib", "rlib"] [dependencies] abi_stable = "0.11.3" arrow = { workspace = true, features = ["ffi"] } +arrow-array = { workspace = true } +arrow-schema = { workspace = true } async-ffi = { version = "0.5.0", features = ["abi_stable"] } async-trait = { workspace = true } datafusion = { workspace = true, default-features = false } @@ -49,3 +52,6 @@ tokio = { workspace = true } [dev-dependencies] doc-comment = { workspace = true } + +[features] +integration-tests = [] diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs index f71cc16457a3..366e9fbbe1a3 100644 --- a/datafusion/ffi/src/lib.rs +++ b/datafusion/ffi/src/lib.rs @@ -26,6 +26,9 @@ pub mod session_config; pub mod table_provider; pub mod table_source; +#[cfg(feature = "integration-tests")] +pub mod tests; + /// Returns the major version of the FFI implementation. If the API evolves, /// we use the major version to identify compatibility over the unsafe /// boundary. diff --git a/datafusion/ffitest/src/async_provider.rs b/datafusion/ffi/src/tests/async_provider.rs similarity index 98% rename from datafusion/ffitest/src/async_provider.rs rename to datafusion/ffi/src/tests/async_provider.rs index 4c190df5c051..8dbdb4fc3a16 100644 --- a/datafusion/ffitest/src/async_provider.rs +++ b/datafusion/ffi/src/tests/async_provider.rs @@ -17,6 +17,7 @@ use std::{any::Any, fmt::Debug, sync::Arc}; +use crate::table_provider::FFI_TableProvider; use arrow_array::RecordBatch; use arrow_schema::Schema; use async_trait::async_trait; @@ -28,14 +29,13 @@ use datafusion::{ physical_plan::{ExecutionPlan, Partitioning}, prelude::Expr, }; -use datafusion_ffi::table_provider::FFI_TableProvider; use futures::Stream; use tokio::{ runtime::Runtime, sync::{broadcast, mpsc}, }; -use crate::create_record_batch; +use super::create_record_batch; #[derive(Debug)] pub struct AsyncTableProvider { diff --git a/datafusion/ffitest/src/lib.rs b/datafusion/ffi/src/tests/mod.rs similarity index 94% rename from datafusion/ffitest/src/lib.rs rename to datafusion/ffi/src/tests/mod.rs index 0239aa9b6cb8..d2e865d6c2a1 100644 --- a/datafusion/ffitest/src/lib.rs +++ b/datafusion/ffi/src/tests/mod.rs @@ -26,13 +26,13 @@ use abi_stable::{ StableAbi, }; +use super::table_provider::FFI_TableProvider; use arrow_array::RecordBatch; use async_provider::create_async_table_provider; use datafusion::{ arrow::datatypes::{DataType, Field, Schema}, common::record_batch, }; -use datafusion_ffi::table_provider::FFI_TableProvider; use sync_provider::create_sync_table_provider; mod async_provider; @@ -55,8 +55,8 @@ pub struct TableProviderModule { impl RootModule for TableProviderModuleRef { declare_root_module_statics! {TableProviderModuleRef} - const BASE_NAME: &'static str = "datafusion_ffi_test"; - const NAME: &'static str = "datafusion_ffi_test"; + const BASE_NAME: &'static str = "datafusion_ffi"; + const NAME: &'static str = "datafusion_ffi"; const VERSION_STRINGS: VersionStrings = package_version_strings!(); fn initialization(self) -> Result { @@ -93,7 +93,7 @@ extern "C" fn construct_table_provider(synchronous: bool) -> FFI_TableProvider { pub fn get_simple_memory_table() -> TableProviderModuleRef { TableProviderModule { create_table: construct_table_provider, - version: datafusion_ffi::version, + version: super::version, } .leak_into_prefix() } diff --git a/datafusion/ffitest/src/sync_provider.rs b/datafusion/ffi/src/tests/sync_provider.rs similarity index 92% rename from datafusion/ffitest/src/sync_provider.rs rename to datafusion/ffi/src/tests/sync_provider.rs index 3c0514cd501c..ff85e0b15b39 100644 --- a/datafusion/ffitest/src/sync_provider.rs +++ b/datafusion/ffi/src/tests/sync_provider.rs @@ -17,10 +17,10 @@ use std::sync::Arc; +use crate::table_provider::FFI_TableProvider; use datafusion::datasource::MemTable; -use datafusion_ffi::table_provider::FFI_TableProvider; -use crate::{create_record_batch, create_test_schema}; +use super::{create_record_batch, create_test_schema}; pub(crate) fn create_sync_table_provider() -> FFI_TableProvider { let schema = create_test_schema(); diff --git a/datafusion/ffitest/tests/table_provider.rs b/datafusion/ffi/tests/table_provider.rs similarity index 93% rename from datafusion/ffitest/tests/table_provider.rs rename to datafusion/ffi/tests/table_provider.rs index 8763e532c8be..f1c82c252166 100644 --- a/datafusion/ffitest/tests/table_provider.rs +++ b/datafusion/ffi/tests/table_provider.rs @@ -19,9 +19,7 @@ use abi_stable::library::RootModule; use datafusion::error::{DataFusionError, Result}; use datafusion::prelude::SessionContext; use datafusion_ffi::table_provider::ForeignTableProvider; -#[allow(clippy::single_component_path_imports)] -use datafusion_ffi_test; -use datafusion_ffi_test::TableProviderModuleRef; +use datafusion_ffi::tests::TableProviderModuleRef; use std::path::Path; use std::sync::Arc; @@ -113,9 +111,9 @@ async fn test_table_provider(synchronous: bool) -> Result<()> { let results = df.collect().await?; assert_eq!(results.len(), 3); - assert_eq!(results[0], datafusion_ffi_test::create_record_batch(1, 5)); - assert_eq!(results[1], datafusion_ffi_test::create_record_batch(6, 1)); - assert_eq!(results[2], datafusion_ffi_test::create_record_batch(7, 5)); + assert_eq!(results[0], datafusion_ffi::tests::create_record_batch(1, 5)); + assert_eq!(results[1], datafusion_ffi::tests::create_record_batch(6, 1)); + assert_eq!(results[2], datafusion_ffi::tests::create_record_batch(7, 5)); Ok(()) } diff --git a/datafusion/ffitest/Cargo.toml b/datafusion/ffitest/Cargo.toml deleted file mode 100644 index e11e6aed49b1..000000000000 --- a/datafusion/ffitest/Cargo.toml +++ /dev/null @@ -1,47 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -[package] -name = "datafusion-ffi-test" -description = "Integration tests for DataFusion FFI" -version = { workspace = true } -edition = { workspace = true } -homepage = { workspace = true } -repository = { workspace = true } -license = { workspace = true } -authors = { workspace = true } -publish = false -# Specify MSRV here as `cargo msrv` doesn't support workspace version -rust-version = "1.80.1" - -[lints] -workspace = true - -[lib] -name = "datafusion_ffi_test" -crate-type = ["cdylib", "rlib"] - -[dependencies] -abi_stable = "0.11.3" -arrow = { workspace = true } -arrow-array = { workspace = true } -arrow-schema = { workspace = true } -async-trait = { workspace = true } -datafusion = { workspace = true, default-features = false } -datafusion-ffi = { workspace = true } -futures = { workspace = true } -tokio = { workspace = true } From d90dcd78a7680899c3ce26f4e0359aed3a363806 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 25 Jan 2025 13:15:10 -0500 Subject: [PATCH 11/19] Add integation-tests feature to ci tests --- .github/workflows/rust.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 7ac0dfa78215..f8a61bac4e2a 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -185,7 +185,7 @@ jobs: with: rust-version: stable - name: Run tests (excluding doctests) - run: cargo test --profile ci --exclude datafusion-examples --exclude datafusion-benchmarks --workspace --lib --tests --bins --features avro,json,backtrace + run: cargo test --profile ci --exclude datafusion-examples --exclude datafusion-benchmarks --workspace --lib --tests --bins --features avro,json,backtrace,integration-tests - name: Verify Working Directory Clean run: git diff --exit-code From a4db8128fa6374d9a032ae41e192bab2cd3c7af4 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 25 Jan 2025 15:22:26 -0500 Subject: [PATCH 12/19] Add integration-tests feature to CI run --- .github/workflows/rust.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index f8a61bac4e2a..0bd35fdff4fc 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -253,7 +253,7 @@ jobs: rust-version: stable - name: Run doctests run: | - cargo test --profile ci --doc --features avro,json + cargo test --profile ci --doc --features avro,json,integration-tests cd datafusion-cli cargo test --profile ci --doc --all-features - name: Verify Working Directory Clean @@ -417,7 +417,7 @@ jobs: - name: Run tests (excluding doctests) shell: bash run: | - cargo test --profile ci --lib --tests --bins --features avro,json,backtrace + cargo test --profile ci --lib --tests --bins --features avro,json,backtrace,integration-tests cd datafusion-cli cargo test --profile ci --lib --tests --bins --all-features From f2752ced307cb17ec830e3324b88fec66a6420d7 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 25 Jan 2025 15:55:58 -0500 Subject: [PATCH 13/19] Add clarifying text --- datafusion/ffi/src/lib.rs | 3 ++- datafusion/ffi/src/tests/async_provider.rs | 14 ++++++++++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs index 366e9fbbe1a3..bef36b1ddd48 100644 --- a/datafusion/ffi/src/lib.rs +++ b/datafusion/ffi/src/lib.rs @@ -31,7 +31,8 @@ pub mod tests; /// Returns the major version of the FFI implementation. If the API evolves, /// we use the major version to identify compatibility over the unsafe -/// boundary. +/// boundary. This call is intended to be used by implementers to validate +/// they have compatible libraries. pub extern "C" fn version() -> u64 { let version_str = env!("CARGO_PKG_VERSION"); let version = semver::Version::parse(version_str).expect("Invalid version string"); diff --git a/datafusion/ffi/src/tests/async_provider.rs b/datafusion/ffi/src/tests/async_provider.rs index 8dbdb4fc3a16..38ddd13952b0 100644 --- a/datafusion/ffi/src/tests/async_provider.rs +++ b/datafusion/ffi/src/tests/async_provider.rs @@ -15,6 +15,16 @@ // specific language governing permissions and limitations // under the License. +//! This is an example of an async table provider that will call functions on +//! the tokio runtime of the library providing the function. Since we cannot +//! share a tokio runtime across the ffi boundary and the producer and consumer +//! may have different runtimes, we need to store a reference to the runtime +//! and enter it during streaming calls. The entering of the runtime will +//! occur by the datafusion_ffi crate during the streaming calls. This code +//! serves as an integration test of this feature. If we do not correctly +//! access the runtime, then you will get a panic when trying to do operations +//! such as spawning a tokio task. + use std::{any::Any, fmt::Debug, sync::Arc}; use crate::table_provider::FFI_TableProvider; @@ -45,7 +55,7 @@ pub struct AsyncTableProvider { _join_handle: Option>, } -fn async_table_provider_task( +fn async_table_provider_thread( mut shutdown: mpsc::Receiver, mut batch_request: mpsc::Receiver, batch_sender: broadcast::Sender>, @@ -92,7 +102,7 @@ pub fn start_async_provider() -> (AsyncTableProvider, Arc) { // in datafusion-python and probably other places. This will let us test that // we do correctly enter the runtime of the foreign provider. let join_handle = Some(std::thread::spawn(move || { - async_table_provider_task( + async_table_provider_thread( shutdown_rx, batch_request_rx, record_batch_tx, From 9e5924c75f1a25442d97f623c1689522d4c13090 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 25 Jan 2025 16:11:41 -0500 Subject: [PATCH 14/19] Update CI to only run integration tests for certain checks --- .github/workflows/rust.yml | 6 +- datafusion/ffi/tests/table_provider.rs | 212 +++++++++++++------------ 2 files changed, 112 insertions(+), 106 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 0bd35fdff4fc..bb78bce402c5 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -60,7 +60,7 @@ jobs: with: rust-version: stable - name: Prepare cargo build - run: cargo check --profile ci --all-targets + run: cargo check --profile ci --all-targets --features integration-tests # cargo check common, functions and substrait with no default features linux-cargo-check-no-default-features: @@ -92,8 +92,8 @@ jobs: - name: Check workspace in debug mode run: cargo check --profile ci --all-targets --workspace - - name: Check workspace with avro,json features - run: cargo check --profile ci --workspace --benches --features avro,json + - name: Check workspace with additional features + run: cargo check --profile ci --workspace --benches --features avro,json,integration-tests - name: Check Cargo.lock for datafusion-cli run: | diff --git a/datafusion/ffi/tests/table_provider.rs b/datafusion/ffi/tests/table_provider.rs index f1c82c252166..9169c9f4221c 100644 --- a/datafusion/ffi/tests/table_provider.rs +++ b/datafusion/ffi/tests/table_provider.rs @@ -15,115 +15,121 @@ // specific language governing permissions and limitations // under the License. -use abi_stable::library::RootModule; -use datafusion::error::{DataFusionError, Result}; -use datafusion::prelude::SessionContext; -use datafusion_ffi::table_provider::ForeignTableProvider; -use datafusion_ffi::tests::TableProviderModuleRef; -use std::path::Path; -use std::sync::Arc; - -/// Compute the path to the library. It would be preferable to simply use -/// abi_stable::library::development_utils::compute_library_path however -/// our current CI pipeline has a `ci` profile that we need to use to -/// find the library. -pub fn compute_library_path( - target_path: &Path, -) -> std::io::Result { - let debug_dir = target_path.join("debug"); - let release_dir = target_path.join("release"); - let ci_dir = target_path.join("ci"); - - let debug_path = M::get_library_path(&debug_dir.join("deps")); - let release_path = M::get_library_path(&release_dir.join("deps")); - let ci_path = M::get_library_path(&ci_dir.join("deps")); - - let all_paths = vec![ - (debug_dir.clone(), debug_path), - (release_dir, release_path), - (ci_dir, ci_path), - ]; - - let best_path = all_paths - .into_iter() - .filter(|(_, path)| path.exists()) - .filter_map(|(dir, path)| path.metadata().map(|m| (dir, m)).ok()) - .filter_map(|(dir, meta)| meta.modified().map(|m| (dir, m)).ok()) - .max_by_key(|(_, date)| *date) - .map(|(dir, _)| dir) - .unwrap_or(debug_dir); - - Ok(best_path) -} - -/// It is important that this test is in the `tests` directory and not in the -/// library directory so we can verify we are building a dynamic library and -/// testing it via a different executable. -async fn test_table_provider(synchronous: bool) -> Result<()> { - let expected_version = datafusion_ffi::version(); - - let crate_root = Path::new(env!("CARGO_MANIFEST_DIR")); - let target_dir = crate_root - .parent() - .expect("Failed to find crate parent") - .parent() - .expect("Failed to find workspace root") - .join("target"); - - // Find the location of the library. This is specific to the build environment, - // so you will need to change the approach here based on your use case. - // let target: &std::path::Path = "../../../../target/".as_ref(); - let library_path = - compute_library_path::(target_dir.as_path()) - .map_err(|e| DataFusionError::External(Box::new(e)))? - .join("deps"); - - // Load the module - let table_provider_module = - TableProviderModuleRef::load_from_directory(&library_path) - .map_err(|e| DataFusionError::External(Box::new(e)))?; - - assert_eq!( - table_provider_module - .version() - .expect("Unable to call version on FFI module")(), - expected_version - ); - - // By calling the code below, the table provided will be created within - // the module's code. - let ffi_table_provider = - table_provider_module - .create_table() - .ok_or(DataFusionError::NotImplemented( +/// Add an additional module here for convenience to scope this to only +/// when the feature integtation-tests is built +#[cfg(feature = "integration-tests")] +mod tests { + + use abi_stable::library::RootModule; + use datafusion::error::{DataFusionError, Result}; + use datafusion::prelude::SessionContext; + use datafusion_ffi::table_provider::ForeignTableProvider; + use datafusion_ffi::tests::TableProviderModuleRef; + use std::path::Path; + use std::sync::Arc; + + /// Compute the path to the library. It would be preferable to simply use + /// abi_stable::library::development_utils::compute_library_path however + /// our current CI pipeline has a `ci` profile that we need to use to + /// find the library. + pub fn compute_library_path( + target_path: &Path, + ) -> std::io::Result { + let debug_dir = target_path.join("debug"); + let release_dir = target_path.join("release"); + let ci_dir = target_path.join("ci"); + + let debug_path = M::get_library_path(&debug_dir.join("deps")); + let release_path = M::get_library_path(&release_dir.join("deps")); + let ci_path = M::get_library_path(&ci_dir.join("deps")); + + let all_paths = vec![ + (debug_dir.clone(), debug_path), + (release_dir, release_path), + (ci_dir, ci_path), + ]; + + let best_path = all_paths + .into_iter() + .filter(|(_, path)| path.exists()) + .filter_map(|(dir, path)| path.metadata().map(|m| (dir, m)).ok()) + .filter_map(|(dir, meta)| meta.modified().map(|m| (dir, m)).ok()) + .max_by_key(|(_, date)| *date) + .map(|(dir, _)| dir) + .unwrap_or(debug_dir); + + Ok(best_path) + } + + /// It is important that this test is in the `tests` directory and not in the + /// library directory so we can verify we are building a dynamic library and + /// testing it via a different executable. + #[cfg(feature = "integration-tests")] + async fn test_table_provider(synchronous: bool) -> Result<()> { + let expected_version = datafusion_ffi::version(); + + let crate_root = Path::new(env!("CARGO_MANIFEST_DIR")); + let target_dir = crate_root + .parent() + .expect("Failed to find crate parent") + .parent() + .expect("Failed to find workspace root") + .join("target"); + + // Find the location of the library. This is specific to the build environment, + // so you will need to change the approach here based on your use case. + // let target: &std::path::Path = "../../../../target/".as_ref(); + let library_path = + compute_library_path::(target_dir.as_path()) + .map_err(|e| DataFusionError::External(Box::new(e)))? + .join("deps"); + + // Load the module + let table_provider_module = + TableProviderModuleRef::load_from_directory(&library_path) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + assert_eq!( + table_provider_module + .version() + .expect("Unable to call version on FFI module")(), + expected_version + ); + + // By calling the code below, the table provided will be created within + // the module's code. + let ffi_table_provider = table_provider_module.create_table().ok_or( + DataFusionError::NotImplemented( "External table provider failed to implement create_table".to_string(), - ))?(synchronous); + ), + )?(synchronous); - // In order to access the table provider within this executable, we need to - // turn it into a `ForeignTableProvider`. - let foreign_table_provider: ForeignTableProvider = (&ffi_table_provider).into(); + // In order to access the table provider within this executable, we need to + // turn it into a `ForeignTableProvider`. + let foreign_table_provider: ForeignTableProvider = (&ffi_table_provider).into(); - let ctx = SessionContext::new(); + let ctx = SessionContext::new(); - // Display the data to show the full cycle works. - ctx.register_table("external_table", Arc::new(foreign_table_provider))?; - let df = ctx.table("external_table").await?; - let results = df.collect().await?; + // Display the data to show the full cycle works. + ctx.register_table("external_table", Arc::new(foreign_table_provider))?; + let df = ctx.table("external_table").await?; + let results = df.collect().await?; - assert_eq!(results.len(), 3); - assert_eq!(results[0], datafusion_ffi::tests::create_record_batch(1, 5)); - assert_eq!(results[1], datafusion_ffi::tests::create_record_batch(6, 1)); - assert_eq!(results[2], datafusion_ffi::tests::create_record_batch(7, 5)); + assert_eq!(results.len(), 3); + assert_eq!(results[0], datafusion_ffi::tests::create_record_batch(1, 5)); + assert_eq!(results[1], datafusion_ffi::tests::create_record_batch(6, 1)); + assert_eq!(results[2], datafusion_ffi::tests::create_record_batch(7, 5)); - Ok(()) -} + Ok(()) + } -#[tokio::test] -async fn async_test_table_provider() -> Result<()> { - test_table_provider(false).await -} + #[tokio::test] + async fn async_test_table_provider() -> Result<()> { + test_table_provider(false).await + } -#[tokio::test] -async fn sync_test_table_provider() -> Result<()> { - test_table_provider(true).await + #[tokio::test] + async fn sync_test_table_provider() -> Result<()> { + test_table_provider(true).await + } } From ccf980418d3b1e63646e1d4b601e75310430595f Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 26 Jan 2025 08:32:22 -0500 Subject: [PATCH 15/19] When the feature integtation-tests is enabled, we get conflicting library entries for the example table provider and integration test, so disable the example during CI run --- .github/workflows/rust.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index bb78bce402c5..b3165f324767 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -185,7 +185,7 @@ jobs: with: rust-version: stable - name: Run tests (excluding doctests) - run: cargo test --profile ci --exclude datafusion-examples --exclude datafusion-benchmarks --workspace --lib --tests --bins --features avro,json,backtrace,integration-tests + run: cargo test --profile ci --exclude datafusion-examples,ffi_example_table_provider --exclude datafusion-benchmarks --workspace --lib --tests --bins --features avro,json,backtrace,integration-tests - name: Verify Working Directory Clean run: git diff --exit-code @@ -253,7 +253,7 @@ jobs: rust-version: stable - name: Run doctests run: | - cargo test --profile ci --doc --features avro,json,integration-tests + cargo test --profile ci --doc --features avro,json,integration-tests --exclude ffi_example_table_provider cd datafusion-cli cargo test --profile ci --doc --all-features - name: Verify Working Directory Clean @@ -420,7 +420,7 @@ jobs: cargo test --profile ci --lib --tests --bins --features avro,json,backtrace,integration-tests cd datafusion-cli cargo test --profile ci --lib --tests --bins --all-features - +q test-datafusion-pyarrow: name: cargo test pyarrow (amd64) needs: linux-build-lib From bb2a87a31d900b013d0c14c019cb871d87a850ff Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 26 Jan 2025 18:02:35 -0500 Subject: [PATCH 16/19] Remove typo --- .github/workflows/rust.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index b3165f324767..e8f5673810c8 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -420,7 +420,7 @@ jobs: cargo test --profile ci --lib --tests --bins --features avro,json,backtrace,integration-tests cd datafusion-cli cargo test --profile ci --lib --tests --bins --all-features -q + test-datafusion-pyarrow: name: cargo test pyarrow (amd64) needs: linux-build-lib From 08f1748aed61dd3b7927fc282a5d4c76de46cde8 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Fri, 31 Jan 2025 16:29:39 -0500 Subject: [PATCH 17/19] Specify each excluded crate separately --- .github/workflows/rust.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index e8f5673810c8..ed3218a9694e 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -185,7 +185,7 @@ jobs: with: rust-version: stable - name: Run tests (excluding doctests) - run: cargo test --profile ci --exclude datafusion-examples,ffi_example_table_provider --exclude datafusion-benchmarks --workspace --lib --tests --bins --features avro,json,backtrace,integration-tests + run: cargo test --profile ci --exclude datafusion-examples --exclude ffi_example_table_provider --exclude datafusion-benchmarks --workspace --lib --tests --bins --features avro,json,backtrace,integration-tests - name: Verify Working Directory Clean run: git diff --exit-code From 44abeb8a2b9663fabbe1f1e11d351f245f40a2f2 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Fri, 31 Jan 2025 17:01:25 -0500 Subject: [PATCH 18/19] Doc tests did not need the exclusion --- .github/workflows/rust.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index ed3218a9694e..087a94a1c909 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -253,7 +253,7 @@ jobs: rust-version: stable - name: Run doctests run: | - cargo test --profile ci --doc --features avro,json,integration-tests --exclude ffi_example_table_provider + cargo test --profile ci --doc --features avro,json,integration-tests cd datafusion-cli cargo test --profile ci --doc --all-features - name: Verify Working Directory Clean From bdf5accd8fce8159962fb3ec4143783c29f009ce Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Fri, 31 Jan 2025 18:53:48 -0500 Subject: [PATCH 19/19] Integration tests shouldn't need doc test --- .github/workflows/rust.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 087a94a1c909..c0107bc00ecc 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -253,7 +253,7 @@ jobs: rust-version: stable - name: Run doctests run: | - cargo test --profile ci --doc --features avro,json,integration-tests + cargo test --profile ci --doc --features avro,json cd datafusion-cli cargo test --profile ci --doc --all-features - name: Verify Working Directory Clean