From 392c24421789edfba17d9796eece15dc11408461 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 31 Jul 2025 12:59:53 -0400 Subject: [PATCH 1/2] Arrow arrays with non-zero offsets get copied to a new array with a zero offset before FFI to work around possible Arrow Java issue. --- native/core/src/execution/jni_api.rs | 69 ++++++++++++++++++---------- 1 file changed, 44 insertions(+), 25 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index a7ddce34fd..5cb8c08a68 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -18,28 +18,6 @@ //! Define JNI APIs which can be called from Java/Scala. use super::{serde, utils::SparkArrowConvert}; -use arrow::array::RecordBatch; -use arrow::datatypes::DataType as ArrowDataType; -use datafusion::execution::memory_pool::MemoryPool; -use datafusion::{ - execution::{disk_manager::DiskManagerBuilder, runtime_env::RuntimeEnv}, - physical_plan::{display::DisplayableExecutionPlan, SendableRecordBatchStream}, - prelude::{SessionConfig, SessionContext}, -}; -use futures::poll; -use jni::{ - errors::Result as JNIResult, - objects::{ - JByteArray, JClass, JIntArray, JLongArray, JObject, JObjectArray, JPrimitiveArray, JString, - ReleaseMode, - }, - sys::{jbyteArray, jint, jlong, jlongArray}, - JNIEnv, -}; -use std::path::PathBuf; -use std::time::{Duration, Instant}; -use std::{sync::Arc, task::Poll}; - use crate::{ errors::{try_unwrap_or_throw, CometError, CometResult}, execution::{ @@ -48,19 +26,41 @@ use crate::{ }, jvm_bridge::{jni_new_global_ref, JVMClasses}, }; +use arrow::array::{Array, RecordBatch, UInt32Array}; +use arrow::compute::{take, TakeOptions}; +use arrow::datatypes::DataType as ArrowDataType; use datafusion::common::ScalarValue; use datafusion::execution::disk_manager::DiskManagerMode; +use datafusion::execution::memory_pool::MemoryPool; use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::logical_expr::ScalarUDF; +use datafusion::{ + execution::{disk_manager::DiskManagerBuilder, runtime_env::RuntimeEnv}, + physical_plan::{display::DisplayableExecutionPlan, SendableRecordBatchStream}, + prelude::{SessionConfig, SessionContext}, +}; use datafusion_comet_proto::spark_operator::Operator; use datafusion_spark::function::math::expm1::SparkExpm1; +use futures::poll; use futures::stream::StreamExt; use jni::objects::JByteBuffer; use jni::sys::JNI_FALSE; +use jni::{ + errors::Result as JNIResult, + objects::{ + JByteArray, JClass, JIntArray, JLongArray, JObject, JObjectArray, JPrimitiveArray, JString, + ReleaseMode, + }, + sys::{jbyteArray, jint, jlong, jlongArray}, + JNIEnv, +}; use jni::{ objects::GlobalRef, sys::{jboolean, jdouble, jintArray, jobjectArray, jstring}, }; +use std::path::PathBuf; +use std::time::{Duration, Instant}; +use std::{sync::Arc, task::Poll}; use tokio::runtime::Runtime; use crate::execution::memory_pools::{ @@ -341,10 +341,29 @@ fn prepare_output( let mut i = 0; while i < results.len() { let array_ref = results.get(i).ok_or(CometError::IndexOutOfBounds(i))?; - array_ref - .to_data() - .move_to_spark(array_addrs[i], schema_addrs[i])?; + if array_ref.offset() != 0 { + // https://github.com/apache/datafusion-comet/issues/2051 + // Bug with non-zero offset FFI, so take to a new array which will have an offset of 0. + let indices = UInt32Array::from((0..num_rows as u32).collect::>()); + let new_array = take( + array_ref, + &indices, + Some(TakeOptions { + check_bounds: false, + }), + )?; + + debug_assert!(new_array.offset() == 0); + + new_array + .to_data() + .move_to_spark(array_addrs[i], schema_addrs[i])?; + } else { + array_ref + .to_data() + .move_to_spark(array_addrs[i], schema_addrs[i])?; + } i += 1; } } From 14318594fc26dbf916d624d9646d0bda82f0676e Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 31 Jul 2025 14:22:03 -0400 Subject: [PATCH 2/2] Address PR feedback. --- native/core/src/execution/jni_api.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 5cb8c08a68..a5564c1b4b 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -345,16 +345,15 @@ fn prepare_output( if array_ref.offset() != 0 { // https://github.com/apache/datafusion-comet/issues/2051 // Bug with non-zero offset FFI, so take to a new array which will have an offset of 0. + // We expect this to be a cold code path, hence the check_bounds: true and assert_eq. let indices = UInt32Array::from((0..num_rows as u32).collect::>()); let new_array = take( array_ref, &indices, - Some(TakeOptions { - check_bounds: false, - }), + Some(TakeOptions { check_bounds: true }), )?; - debug_assert!(new_array.offset() == 0); + assert_eq!(new_array.offset(), 0); new_array .to_data()