Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 45 additions & 6 deletions arrow-array/src/builder/generic_bytes_view_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,15 +306,30 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
/// - String length exceeds `u32::MAX`
#[inline]
pub fn append_value(&mut self, value: impl AsRef<T::Native>) {
self.try_append_value(value).unwrap()
}

/// Appends a value into the builder
///
/// # Errors
///
/// Returns an error if:
/// - String buffer count exceeds `u32::MAX`
/// - String length exceeds `u32::MAX`
#[inline]
pub fn try_append_value(&mut self, value: impl AsRef<T::Native>) -> Result<(), ArrowError> {
let v: &[u8] = value.as_ref().as_ref();
let length: u32 = v.len().try_into().unwrap();
let length: u32 = v.len().try_into().map_err(|_| {
ArrowError::InvalidArgumentError(format!("String length {} exceeds u32::MAX", v.len()))
})?;

if length <= MAX_INLINE_VIEW_LEN {
let mut view_buffer = [0; 16];
view_buffer[0..4].copy_from_slice(&length.to_le_bytes());
view_buffer[4..4 + v.len()].copy_from_slice(v);
self.views_buffer.push(u128::from_le_bytes(view_buffer));
self.null_buffer_builder.append_non_null();
return;
return Ok(());
}

// Deduplication if:
Expand All @@ -339,7 +354,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
self.views_buffer.push(self.views_buffer[*idx]);
self.null_buffer_builder.append_non_null();
self.string_tracker = Some((ht, hasher));
return;
return Ok(());
}
Entry::Vacant(vacant) => {
// o.w. we insert the (string hash -> view index)
Expand All @@ -356,17 +371,41 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
let to_reserve = v.len().max(self.block_size.next_size() as usize);
self.in_progress.reserve(to_reserve);
};
let offset = self.in_progress.len() as u32;
let offset: u32 = self.in_progress.len().try_into().map_err(|_| {
ArrowError::InvalidArgumentError(format!(
"In-progress buffer length {} exceeds u32::MAX",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I think the method can recover by starting a new in-progress buffer instead of returning an error here.

  2. I am unsure if this error is even reachable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a new buffer would be allocated in the line immediately above this. Maybe we should do a checked add in let required_cap = self.in_progress.len() + v.len(); 🤔

To error here, we would need a usize that doesn't fit into a u32.. I think all platforms we care about have usize that is at least u32 (aka 32-bit architectures)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To error here, we would need a usize that doesn't fit into a u32.. I think all platforms we care about have usize that is at least u32 (aka 32-bit architectures)

I think that would be the opposite, a usize in a 64-bit arch wouldn't fit a u32? Anyway, I will review and update these changes over the weekend

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right -- thank you

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unsure if this error is even reachable.

I think this is right. I wasn't able to trigger this. For the panic to verify, it would need to skip the flush_in_progress() operation that happens when the buffer reaches the required capacity.

On top of that, the push_completed used by the flush_in_progress asserts on the block.len() (see below). Therefore, let offset: u32 = self.in_progress.len() wouldn't be reached:

fn push_completed(&mut self, block: Buffer) {
assert!(block.len() < u32::MAX as usize, "Block too large");
assert!(self.completed.len() < u32::MAX as usize, "Too many blocks");
self.completed.push(block);

I'm proceeding by reverting this the map_err in the offset initialization.

self.in_progress.len()
))
})?;
self.in_progress.extend_from_slice(v);

let prefix = v
.get(0..4)
.and_then(|slice| slice.try_into().ok())
.map(u32::from_le_bytes)
.ok_or_else(|| {
ArrowError::InvalidArgumentError(
"String must be at least 4 bytes for non-inline view".to_string(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error is unreachable as we checked that the value is longer than MAX_INLINE_VIEW_LEN (12 bytes) above.

)
})?;

let buffer_index: u32 = self.completed.len().try_into().map_err(|_| {
ArrowError::InvalidArgumentError(format!(
"Buffer count {} exceeds u32::MAX",
self.completed.len()
))
})?;

let view = ByteView {
length,
prefix: u32::from_le_bytes(v[0..4].try_into().unwrap()),
buffer_index: self.completed.len() as u32,
prefix,
buffer_index,
offset,
};
self.views_buffer.push(view.into());
self.null_buffer_builder.append_non_null();

Ok(())
}

/// Append an `Option` value into the builder
Expand Down
Loading