Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ members = [
"datafusion/expr-common",
"datafusion/execution",
"datafusion/ffi",
"datafusion/ffitest",
"datafusion/functions",
"datafusion/functions-aggregate",
"datafusion/functions-aggregate-common",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ extern "C" fn construct_simple_table_provider() -> FFI_TableProvider {

let table_provider = MemTable::try_new(schema, vec![batches]).unwrap();

FFI_TableProvider::new(Arc::new(table_provider), true)
FFI_TableProvider::new(Arc::new(table_provider), true, None)
}

#[export_root_module]
Expand Down
3 changes: 2 additions & 1 deletion datafusion/ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ datafusion-proto = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
prost = { workspace = true }
semver = "1.0.24"
tokio = { workspace = true }

[dev-dependencies]
doc-comment = { workspace = true }
tokio = { workspace = true }
30 changes: 24 additions & 6 deletions datafusion/ffi/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datafusion::{
execution::{SendableRecordBatchStream, TaskContext},
physical_plan::{DisplayAs, ExecutionPlan, PlanProperties},
};
use tokio::runtime::Runtime;

use crate::{
plan_properties::FFI_PlanProperties, record_batch_stream::FFI_RecordBatchStream,
Expand Down Expand Up @@ -71,6 +72,7 @@ unsafe impl Sync for FFI_ExecutionPlan {}
pub struct ExecutionPlanPrivateData {
pub plan: Arc<dyn ExecutionPlan>,
pub context: Arc<TaskContext>,
pub runtime: Option<Arc<Runtime>>,
}

unsafe extern "C" fn properties_fn_wrapper(
Expand All @@ -88,11 +90,14 @@ unsafe extern "C" fn children_fn_wrapper(
let private_data = plan.private_data as *const ExecutionPlanPrivateData;
let plan = &(*private_data).plan;
let ctx = &(*private_data).context;
let runtime = &(*private_data).runtime;

let children: Vec<_> = plan
.children()
.into_iter()
.map(|child| FFI_ExecutionPlan::new(Arc::clone(child), Arc::clone(ctx)))
.map(|child| {
FFI_ExecutionPlan::new(Arc::clone(child), Arc::clone(ctx), runtime.clone())
})
.collect();

children.into()
Expand All @@ -105,9 +110,10 @@ unsafe extern "C" fn execute_fn_wrapper(
let private_data = plan.private_data as *const ExecutionPlanPrivateData;
let plan = &(*private_data).plan;
let ctx = &(*private_data).context;
let runtime = (*private_data).runtime.as_ref().map(Arc::clone);

match plan.execute(partition, Arc::clone(ctx)) {
Ok(rbs) => RResult::ROk(rbs.into()),
Ok(rbs) => RResult::ROk(FFI_RecordBatchStream::new(rbs, runtime)),
Err(e) => RResult::RErr(
format!("Error occurred during FFI_ExecutionPlan execute: {}", e).into(),
),
Expand All @@ -129,7 +135,11 @@ unsafe extern "C" fn clone_fn_wrapper(plan: &FFI_ExecutionPlan) -> FFI_Execution
let private_data = plan.private_data as *const ExecutionPlanPrivateData;
let plan_data = &(*private_data);

FFI_ExecutionPlan::new(Arc::clone(&plan_data.plan), Arc::clone(&plan_data.context))
FFI_ExecutionPlan::new(
Arc::clone(&plan_data.plan),
Arc::clone(&plan_data.context),
plan_data.runtime.clone(),
)
}

impl Clone for FFI_ExecutionPlan {
Expand All @@ -140,8 +150,16 @@ impl Clone for FFI_ExecutionPlan {

impl FFI_ExecutionPlan {
/// This function is called on the provider's side.
pub fn new(plan: Arc<dyn ExecutionPlan>, context: Arc<TaskContext>) -> Self {
let private_data = Box::new(ExecutionPlanPrivateData { plan, context });
pub fn new(
plan: Arc<dyn ExecutionPlan>,
context: Arc<TaskContext>,
runtime: Option<Arc<Runtime>>,
) -> Self {
let private_data = Box::new(ExecutionPlanPrivateData {
plan,
context,
runtime,
});

Self {
properties: properties_fn_wrapper,
Expand Down Expand Up @@ -357,7 +375,7 @@ mod tests {
let original_plan = Arc::new(EmptyExec::new(schema));
let original_name = original_plan.name().to_string();

let local_plan = FFI_ExecutionPlan::new(original_plan, ctx.task_ctx());
let local_plan = FFI_ExecutionPlan::new(original_plan, ctx.task_ctx(), None);

let foreign_plan: ForeignExecutionPlan = (&local_plan).try_into()?;

Expand Down
9 changes: 9 additions & 0 deletions datafusion/ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,14 @@ pub mod session_config;
pub mod table_provider;
pub mod table_source;

/// Returns the major version of the FFI implementation. If the API evolves,
/// we use the major version to identify compatibility over the unsafe
/// boundary.
pub extern "C" fn version() -> u64 {
let version_str = env!("CARGO_PKG_VERSION");
let version = semver::Version::parse(version_str).expect("Invalid version string");
version.major
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont see this used anywhere in this PR to guard against unsafe boundary; please ignore if the PR is still WIP

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm adding an additional note in the description that this function call is intended to be used by projects that are exposing libraries over FFI. Since that may be done in a couple of different ways such as either dynamic linking of a library or jumping through python, it's up to the downstream users to make the calls to this function and see if they are compatible. Also, since it is possible that the FFI bindings do not evolve over some datafusion versions you may have a case where multiple versions are compatible and don't require the same major version.


#[cfg(doctest)]
doc_comment::doctest!("../README.md", readme_example_test);
28 changes: 24 additions & 4 deletions datafusion/ffi/src/record_batch_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::{ffi::c_void, task::Poll};
use std::{ffi::c_void, sync::Arc, task::Poll};

use abi_stable::{
std_types::{ROption, RResult, RString},
Expand All @@ -33,6 +33,7 @@ use datafusion::{
execution::{RecordBatchStream, SendableRecordBatchStream},
};
use futures::{Stream, TryStreamExt};
use tokio::runtime::Runtime;

use crate::arrow_wrappers::{WrappedArray, WrappedSchema};

Expand All @@ -58,20 +59,36 @@ pub struct FFI_RecordBatchStream {
pub private_data: *mut c_void,
}

pub struct RecordBatchStreamPrivateData {
pub rbs: SendableRecordBatchStream,
pub runtime: Option<Arc<Runtime>>,
}

impl From<SendableRecordBatchStream> for FFI_RecordBatchStream {
fn from(stream: SendableRecordBatchStream) -> Self {
Self::new(stream, None)
}
}

impl FFI_RecordBatchStream {
pub fn new(stream: SendableRecordBatchStream, runtime: Option<Arc<Runtime>>) -> Self {
let private_data = Box::into_raw(Box::new(RecordBatchStreamPrivateData {
rbs: stream,
runtime,
})) as *mut c_void;
FFI_RecordBatchStream {
poll_next: poll_next_fn_wrapper,
schema: schema_fn_wrapper,
private_data: Box::into_raw(Box::new(stream)) as *mut c_void,
private_data,
}
}
}

unsafe impl Send for FFI_RecordBatchStream {}

unsafe extern "C" fn schema_fn_wrapper(stream: &FFI_RecordBatchStream) -> WrappedSchema {
let stream = stream.private_data as *const SendableRecordBatchStream;
let private_data = stream.private_data as *const RecordBatchStreamPrivateData;
let stream = &(*private_data).rbs;

(*stream).schema().into()
}
Expand Down Expand Up @@ -106,7 +123,10 @@ unsafe extern "C" fn poll_next_fn_wrapper(
stream: &FFI_RecordBatchStream,
cx: &mut FfiContext,
) -> FfiPoll<ROption<RResult<WrappedArray, RString>>> {
let stream = stream.private_data as *mut SendableRecordBatchStream;
let private_data = stream.private_data as *mut RecordBatchStreamPrivateData;
let stream = &mut (*private_data).rbs;

let _guard = (*private_data).runtime.as_ref().map(|rt| rt.enter());

let poll_result = cx.with_context(|std_cx| {
(*stream)
Expand Down
21 changes: 18 additions & 3 deletions datafusion/ffi/src/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use datafusion_proto::{
protobuf::LogicalExprList,
};
use prost::Message;
use tokio::runtime::Runtime;

use crate::{
arrow_wrappers::WrappedSchema,
Expand Down Expand Up @@ -139,6 +140,9 @@ pub struct FFI_TableProvider {
/// Release the memory of the private data when it is no longer being used.
pub release: unsafe extern "C" fn(arg: &mut Self),

/// Return the major DataFusion version number of this provider.
pub version: unsafe extern "C" fn() -> u64,

/// Internal data. This is only to be accessed by the provider of the plan.
/// A [`ForeignExecutionPlan`] should never attempt to access this data.
pub private_data: *mut c_void,
Expand All @@ -149,6 +153,7 @@ unsafe impl Sync for FFI_TableProvider {}

struct ProviderPrivateData {
provider: Arc<dyn TableProvider + Send>,
runtime: Option<Arc<Runtime>>,
}

unsafe extern "C" fn schema_fn_wrapper(provider: &FFI_TableProvider) -> WrappedSchema {
Expand Down Expand Up @@ -216,6 +221,7 @@ unsafe extern "C" fn scan_fn_wrapper(
let private_data = provider.private_data as *mut ProviderPrivateData;
let internal_provider = &(*private_data).provider;
let session_config = session_config.clone();
let runtime = &(*private_data).runtime;

async move {
let config = match ForeignSessionConfig::try_from(&session_config) {
Expand Down Expand Up @@ -261,7 +267,11 @@ unsafe extern "C" fn scan_fn_wrapper(
Err(e) => return RResult::RErr(e.to_string().into()),
};

RResult::ROk(FFI_ExecutionPlan::new(plan, ctx.task_ctx()))
RResult::ROk(FFI_ExecutionPlan::new(
plan,
ctx.task_ctx(),
runtime.clone(),
))
}
.into_ffi()
}
Expand All @@ -273,9 +283,11 @@ unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_TableProvider) {

unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_TableProvider) -> FFI_TableProvider {
let old_private_data = provider.private_data as *const ProviderPrivateData;
let runtime = (*old_private_data).runtime.clone();

let private_data = Box::into_raw(Box::new(ProviderPrivateData {
provider: Arc::clone(&(*old_private_data).provider),
runtime,
})) as *mut c_void;

FFI_TableProvider {
Expand All @@ -285,6 +297,7 @@ unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_TableProvider) -> FFI_Table
supports_filters_pushdown: provider.supports_filters_pushdown,
clone: clone_fn_wrapper,
release: release_fn_wrapper,
version: super::version,
private_data,
}
}
Expand All @@ -300,8 +313,9 @@ impl FFI_TableProvider {
pub fn new(
provider: Arc<dyn TableProvider + Send>,
can_support_pushdown_filters: bool,
runtime: Option<Arc<Runtime>>,
) -> Self {
let private_data = Box::new(ProviderPrivateData { provider });
let private_data = Box::new(ProviderPrivateData { provider, runtime });

Self {
schema: schema_fn_wrapper,
Expand All @@ -313,6 +327,7 @@ impl FFI_TableProvider {
},
clone: clone_fn_wrapper,
release: release_fn_wrapper,
version: super::version,
private_data: Box::into_raw(private_data) as *mut c_void,
}
}
Expand Down Expand Up @@ -463,7 +478,7 @@ mod tests {
let provider =
Arc::new(MemTable::try_new(schema, vec![vec![batch1], vec![batch2]])?);

let ffi_provider = FFI_TableProvider::new(provider, true);
let ffi_provider = FFI_TableProvider::new(provider, true, None);

let foreign_table_provider: ForeignTableProvider = (&ffi_provider).into();

Expand Down
47 changes: 47 additions & 0 deletions datafusion/ffitest/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "datafusion-ffi-test"
description = "Integration tests for DataFusion FFI"
version = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
repository = { workspace = true }
license = { workspace = true }
authors = { workspace = true }
publish = false
# Specify MSRV here as `cargo msrv` doesn't support workspace version
rust-version = "1.80.1"

[lints]
workspace = true

[lib]
name = "datafusion_ffi_test"
crate-type = ["cdylib", "rlib"]

[dependencies]
abi_stable = "0.11.3"
arrow = { workspace = true }
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
async-trait = { workspace = true }
datafusion = { workspace = true, default-features = false }
datafusion-ffi = { workspace = true }
futures = { workspace = true }
tokio = { workspace = true }
Loading
Loading