Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ on:
- arrow/**

jobs:

integration:
name: Archery test With other arrows
runs-on: ubuntu-latest
Expand Down Expand Up @@ -118,9 +117,9 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
rust: [ stable ]
# PyArrow 13 was the last version prior to introduction to Arrow PyCapsules
pyarrow: [ "13", "14" ]
rust: [stable]
# PyArrow 15 was the first version to introduce StringView/BinaryView support
pyarrow: ["15", "16", "17"]
steps:
- uses: actions/checkout@v4
with:
Expand Down
59 changes: 49 additions & 10 deletions arrow-array/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,13 @@ fn bit_width(data_type: &DataType, i: usize) -> Result<usize> {
"The datatype \"{data_type:?}\" expects 3 buffers, but requested {i}. Please verify that the C data interface is correctly implemented."
)))
}
// Variable-sized views: have 3 or more buffers.
// Buffer 1 are the u128 views
// Buffers 2...N-1 are u8 byte buffers
(DataType::Utf8View, 1) | (DataType::BinaryView,1) => u128::BITS as _,
(DataType::Utf8View, _) | (DataType::BinaryView, _) => {
u8::BITS as _
}
// type ids. UnionArray doesn't have null bitmap so buffer index begins with 0.
(DataType::Union(_, _), 0) => i8::BITS as _,
// Only DenseUnion has 2nd buffer
Expand Down Expand Up @@ -300,7 +307,7 @@ impl<'a> ImportedArrowArray<'a> {
};

let data_layout = layout(&self.data_type);
let buffers = self.buffers(data_layout.can_contain_null_mask)?;
let buffers = self.buffers(data_layout.can_contain_null_mask, data_layout.variadic)?;

let null_bit_buffer = if data_layout.can_contain_null_mask {
self.null_bit_buffer()
Expand Down Expand Up @@ -373,13 +380,30 @@ impl<'a> ImportedArrowArray<'a> {

/// returns all buffers, as organized by Rust (i.e. null buffer is skipped if it's present
/// in the spec of the type)
fn buffers(&self, can_contain_null_mask: bool) -> Result<Vec<Buffer>> {
fn buffers(&self, can_contain_null_mask: bool, variadic: bool) -> Result<Vec<Buffer>> {
// + 1: skip null buffer
let buffer_begin = can_contain_null_mask as usize;
(buffer_begin..self.array.num_buffers())
.map(|index| {
let len = self.buffer_len(index, &self.data_type)?;
let buffer_end = self.array.num_buffers() - usize::from(variadic);

let variadic_buffer_lens = if variadic {
// Each views array has 1 (optional) null buffer, 1 views buffer, 1 lengths buffer.
// Rest are variadic.
let num_variadic_buffers =
self.array.num_buffers() - (2 + usize::from(can_contain_null_mask));
if num_variadic_buffers == 0 {
&[]
} else {
let lengths = self.array.buffer(self.array.num_buffers() - 1);
// SAFETY: is lengths is non-null, then it must be valid for up to num_variadic_buffers.
unsafe { std::slice::from_raw_parts(lengths.cast::<i64>(), num_variadic_buffers) }
}
} else {
&[]
};

(buffer_begin..buffer_end)
.map(|index| {
let len = self.buffer_len(index, variadic_buffer_lens, &self.data_type)?;
match unsafe { create_buffer(self.owner.clone(), self.array, index, len) } {
Some(buf) => Ok(buf),
None if len == 0 => {
Expand All @@ -399,7 +423,12 @@ impl<'a> ImportedArrowArray<'a> {
/// Rust implementation uses fixed-sized buffers, which require knowledge of their `len`.
/// for variable-sized buffers, such as the second buffer of a stringArray, we need
/// to fetch offset buffer's len to build the second buffer.
fn buffer_len(&self, i: usize, dt: &DataType) -> Result<usize> {
fn buffer_len(
&self,
i: usize,
variadic_buffer_lengths: &[i64],
dt: &DataType,
) -> Result<usize> {
// Special handling for dictionary type as we only care about the key type in the case.
let data_type = match dt {
DataType::Dictionary(key_data_type, _) => key_data_type.as_ref(),
Expand Down Expand Up @@ -430,7 +459,7 @@ impl<'a> ImportedArrowArray<'a> {
}

// the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1)
let len = self.buffer_len(1, dt)?;
let len = self.buffer_len(1, variadic_buffer_lengths, dt)?;
// first buffer is the null buffer => add(1)
// we assume that pointer is aligned for `i32`, as Utf8 uses `i32` offsets.
#[allow(clippy::cast_ptr_alignment)]
Expand All @@ -444,14 +473,24 @@ impl<'a> ImportedArrowArray<'a> {
}

// the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1)
let len = self.buffer_len(1, dt)?;
let len = self.buffer_len(1, variadic_buffer_lengths, dt)?;
// first buffer is the null buffer => add(1)
// we assume that pointer is aligned for `i64`, as Large uses `i64` offsets.
#[allow(clippy::cast_ptr_alignment)]
let offset_buffer = self.array.buffer(1) as *const i64;
// get last offset
(unsafe { *offset_buffer.add(len / size_of::<i64>() - 1) }) as usize
}
// View types: these have variadic buffers.
// Buffer 1 is the views buffer, which stores 1 u128 per length of the array.
// Buffers 2..N-1 are the buffers holding the byte data. Their lengths are variable.
// Buffer N is of length (N - 2) and stores i64 containing the lengths of buffers 2..N-1
(DataType::Utf8View, 1) | (DataType::BinaryView, 1) => {
std::mem::size_of::<u128>() * length
}
(DataType::Utf8View, i) | (DataType::BinaryView, i) => {
variadic_buffer_lengths[i - 2] as usize
}
// buffer len of primitive types
_ => {
let bits = bit_width(data_type, i)?;
Expand Down Expand Up @@ -1453,8 +1492,8 @@ mod tests_from_ffi {
owner: &array,
};

let offset_buf_len = imported_array.buffer_len(1, &imported_array.data_type)?;
let data_buf_len = imported_array.buffer_len(2, &imported_array.data_type)?;
let offset_buf_len = imported_array.buffer_len(1, &[], &imported_array.data_type)?;
let data_buf_len = imported_array.buffer_len(2, &[], &imported_array.data_type)?;

assert_eq!(offset_buf_len, 4);
assert_eq!(data_buf_len, 0);
Expand Down
7 changes: 5 additions & 2 deletions arrow-buffer/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ impl Buffer {
pub fn advance(&mut self, offset: usize) {
assert!(
offset <= self.length,
"the offset of the new Buffer cannot exceed the existing length"
"the offset of the new Buffer cannot exceed the existing length: offset={} length={}",
offset,
self.length
);
self.length -= offset;
// Safety:
Expand All @@ -221,7 +223,8 @@ impl Buffer {
pub fn slice_with_length(&self, offset: usize, length: usize) -> Self {
assert!(
offset.saturating_add(length) <= self.length,
"the offset of the new Buffer cannot exceed the existing length"
"the offset of the new Buffer cannot exceed the existing length: slice offset={offset} length={length} selflen={}",
self.length
);
// Safety:
// offset + length <= self.length
Expand Down
20 changes: 16 additions & 4 deletions arrow-data/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use crate::bit_mask::set_bits;
use crate::{layout, ArrayData};
use arrow_buffer::buffer::NullBuffer;
use arrow_buffer::{Buffer, MutableBuffer};
use arrow_buffer::{Buffer, MutableBuffer, ScalarBuffer};
use arrow_schema::DataType;
use std::ffi::c_void;

Expand Down Expand Up @@ -121,7 +121,7 @@ impl FFI_ArrowArray {
pub fn new(data: &ArrayData) -> Self {
let data_layout = layout(data.data_type());

let buffers = if data_layout.can_contain_null_mask {
let mut buffers = if data_layout.can_contain_null_mask {
// * insert the null buffer at the start
// * make all others `Option<Buffer>`.
std::iter::once(align_nulls(data.offset(), data.nulls()))
Expand All @@ -132,7 +132,7 @@ impl FFI_ArrowArray {
};

// `n_buffers` is the number of buffers by the spec.
let n_buffers = {
let mut n_buffers = {
data_layout.buffers.len() + {
// If the layout has a null buffer by Arrow spec.
// Note that even the array doesn't have a null buffer because it has
Expand All @@ -141,10 +141,22 @@ impl FFI_ArrowArray {
}
} as i64;

if data_layout.variadic {
// Save the lengths of all variadic buffers into a new buffer.
// The first buffer is `views`, and the rest are variadic.
let mut data_buffers_lengths = Vec::new();
for buffer in data.buffers().iter().skip(1) {
data_buffers_lengths.push(buffer.len() as i64);
n_buffers += 1;
}

buffers.push(Some(ScalarBuffer::from(data_buffers_lengths).into_inner()));
n_buffers += 1;
}

let buffers_ptr = buffers
.iter()
.flat_map(|maybe_buffer| match maybe_buffer {
// note that `raw_data` takes into account the buffer's offset
Some(b) => Some(b.as_ptr() as *const c_void),
// This is for null buffer. We only put a null pointer for
// null buffer if by spec it can contain null mask.
Expand Down
2 changes: 1 addition & 1 deletion arrow/src/pyarrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ impl FromPyArrow for RecordBatch {
validate_pycapsule(array_capsule, "arrow_array")?;

let schema_ptr = unsafe { schema_capsule.reference::<FFI_ArrowSchema>() };
let ffi_array = unsafe { FFI_ArrowArray::from_raw(array_capsule.pointer() as _) };
let ffi_array = unsafe { FFI_ArrowArray::from_raw(array_capsule.pointer().cast()) };
let array_data = unsafe { ffi::from_ffi(ffi_array, schema_ptr) }.map_err(to_py_err)?;
if !matches!(array_data.data_type(), DataType::Struct(_)) {
return Err(PyTypeError::new_err(
Expand Down
5 changes: 4 additions & 1 deletion arrow/tests/pyarrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use arrow::array::{ArrayRef, Int32Array, StringArray};
use arrow::pyarrow::{FromPyArrow, ToPyArrow};
use arrow::record_batch::RecordBatch;
use arrow_array::StringViewArray;
use pyo3::Python;
use std::sync::Arc;

Expand All @@ -27,7 +28,9 @@ fn test_to_pyarrow() {

let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b"]));
let input = RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).unwrap();
// The "very long string" will not be inlined, and force the creation of a data buffer.
let c: ArrayRef = Arc::new(StringViewArray::from(vec!["short", "a very long string"]));
let input = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap();
println!("input: {:?}", input);

let res = Python::with_gil(|py| {
Expand Down