Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 156 additions & 5 deletions arrow-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1473,10 +1473,7 @@ fn reencode_offsets<O: OffsetSizeTrait>(
let offsets = match start_offset.as_usize() {
0 => {
let size = size_of::<O>();
offsets.slice_with_length(
data.offset() * size,
(data.offset() + data.len() + 1) * size,
)
offsets.slice_with_length(data.offset() * size, (data.len() + 1) * size)
}
_ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
};
Expand Down Expand Up @@ -1793,9 +1790,9 @@ mod tests {
use std::io::Cursor;
use std::io::Seek;

use arrow_array::builder::GenericListBuilder;
use arrow_array::builder::MapBuilder;
use arrow_array::builder::UnionBuilder;
use arrow_array::builder::{GenericListBuilder, ListBuilder, StringBuilder};
use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
use arrow_array::types::*;
use arrow_buffer::ScalarBuffer;
Expand Down Expand Up @@ -2433,6 +2430,126 @@ mod tests {
);
}

#[test]
fn test_large_slice_uint32() {
ensure_roundtrip(Arc::new(UInt32Array::from_iter((0..8000).map(|i| {
if i % 2 == 0 {
Some(i)
} else {
None
}
}))));
}

#[test]
fn test_large_slice_string() {
let strings: Vec<_> = (0..8000)
.map(|i| {
if i % 2 == 0 {
Some(format!("value{}", i))
} else {
None
}
})
.collect();

ensure_roundtrip(Arc::new(StringArray::from(strings)));
}

#[test]
fn test_large_slice_string_list() {
let mut ls = ListBuilder::new(StringBuilder::new());

let mut s = String::new();
for row_number in 0..8000 {
if row_number % 2 == 0 {
for list_element in 0..1000 {
s.clear();
use std::fmt::Write;
write!(&mut s, "value{row_number}-{list_element}").unwrap();
ls.values().append_value(&s);
}
ls.append(true)
} else {
ls.append(false); // null
}
}

ensure_roundtrip(Arc::new(ls.finish()));
}

#[test]
fn test_large_slice_string_list_of_lists() {
// The reason for the special test is to verify reencode_offsets which looks both at
// the starting offset and the data offset. So need a dataset where the starting_offset
// is zero but the data offset is not.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Ah, Lists of Lists 🤯

I verified this fails without the tests in this PR like this:

the offset of the new Buffer cannot exceed the existing length: slice offset=4 length=24004 selflen=24004
thread 'writer::tests::test_large_slice_string_list_of_lists' panicked at arrow-buffer/src/buffer/immutable.rs:281:9:
the offset of the new Buffer cannot exceed the existing length: slice offset=4 length=24004 selflen=24004

💯

let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));

for _ in 0..4000 {
ls.values().append(true);
ls.append(true)
}

let mut s = String::new();
for row_number in 0..4000 {
if row_number % 2 == 0 {
for list_element in 0..1000 {
s.clear();
use std::fmt::Write;
write!(&mut s, "value{row_number}-{list_element}").unwrap();
ls.values().values().append_value(&s);
}
ls.values().append(true);
ls.append(true)
} else {
ls.append(false); // null
}
}

ensure_roundtrip(Arc::new(ls.finish()));
}

/// Read/write a record batch to a File and Stream and ensure it is the same at the outout
fn ensure_roundtrip(array: ArrayRef) {
let num_rows = array.len();
let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap();
// take off the first element
let sliced_batch = orig_batch.slice(1, num_rows - 1);

let schema = orig_batch.schema();
let stream_data = {
let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
writer.write(&sliced_batch).unwrap();
writer.into_inner().unwrap()
};
let read_batch = {
let projection = None;
let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap();
reader
.next()
.expect("expect no errors reading batch")
.expect("expect batch")
};
assert_eq!(sliced_batch, read_batch);

let file_data = {
let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap();
writer.write(&sliced_batch).unwrap();
writer.into_inner().unwrap().into_inner().unwrap()
};
let read_batch = {
let projection = None;
let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap();
reader
.next()
.expect("expect no errors reading batch")
.expect("expect batch")
};
assert_eq!(sliced_batch, read_batch);

// TODO test file writer/reader
}

#[test]
fn encode_bools_slice() {
// Test case for https://github.com/apache/arrow-rs/issues/3496
Expand Down Expand Up @@ -2615,6 +2732,40 @@ mod tests {
builder.finish()
}

#[test]
fn reencode_offsets_when_first_offset_is_not_zero() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified that this test fials without the changes in this PR

the offset of the new Buffer cannot exceed the existing length: slice offset=4 length=12 selflen=12
thread 'writer::tests::reencode_offsets_when_first_offset_is_zero' panicked at arrow-buffer/src/buffer/immutable.rs:281:9:
the offset of the new Buffer cannot exceed the existing length: slice offset=4 length=12 selflen=12
stack backtrace:
   0: rust_begin_unwind
             at /rustc/9fc6b43126469e3858e2fe86cafb4f0fd5068869/library/std/src/panicking.rs:665:5
   1: core::panicking::panic_fmt
             at /rustc/9fc6b43126469e3858e2fe86cafb4f0fd5068869/library/core/src/panicking.rs:76:14
   2: arrow_buffer::buffer::immutable::Buffer::slice_with_length
             at /Users/andrewlamb/Software/arrow-rs/arrow-buffer/src/buffer/immutable.rs:281:9
   3: arrow_ipc::writer::reencode_offsets
             at ./src/writer.rs:1476:13
   4: arrow_ipc::writer::tests::reencode_offsets_when_first_offset_is_zero
             at ./src/writer.rs:2641:53
   5: arrow_ipc::writer::tests::reencode_offsets_when_first_offset_is_zero::{{closure}}
             at ./src/writer.rs:2630:52

let original_list = generate_list_data::<i32>();
let original_data = original_list.into_data();
let slice_data = original_data.slice(75, 7);
let (new_offsets, original_start, length) =
reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
assert_eq!(
vec![0, 3, 6, 9, 12, 15, 18, 21],
new_offsets.typed_data::<i32>()
);
assert_eq!(225, original_start);
assert_eq!(21, length);
}

#[test]
fn reencode_offsets_when_first_offset_is_zero() {
let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
// ls = [[], [35, 42]
ls.append(true);
ls.values().append_value(35);
ls.values().append_value(42);
ls.append(true);
let original_list = ls.finish();
let original_data = original_list.into_data();

let slice_data = original_data.slice(1, 1);
let (new_offsets, original_start, length) =
reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
assert_eq!(0, original_start);
assert_eq!(2, length);
}

/// Ensure when serde full & sliced versions they are equal to original input.
/// Also ensure serialized sliced version is significantly smaller than serialized full.
fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, expected_size_factor: usize) {
Expand Down
Loading