diff --git a/arrow-buffer/src/buffer/boolean.rs b/arrow-buffer/src/buffer/boolean.rs index 6e3d7686a1d..137688a6e8b 100644 --- a/arrow-buffer/src/buffer/boolean.rs +++ b/arrow-buffer/src/buffer/boolean.rs @@ -15,12 +15,10 @@ // specific language governing permissions and limitations // under the License. -use crate::bit_chunk_iterator::BitChunks; +use crate::bit_chunk_iterator::{BitChunks, UnalignedBitChunk}; use crate::bit_iterator::{BitIndexIterator, BitIndexU32Iterator, BitIterator, BitSliceIterator}; -use crate::{ - BooleanBufferBuilder, Buffer, MutableBuffer, bit_util, buffer_bin_and, buffer_bin_or, - buffer_bin_xor, buffer_unary_not, -}; +use crate::bit_util::read_u64; +use crate::{BooleanBufferBuilder, Buffer, MutableBuffer, bit_util}; use std::ops::{BitAnd, BitOr, BitXor, Not}; @@ -73,7 +71,7 @@ use std::ops::{BitAnd, BitOr, BitXor, Not}; /// [`NullBuffer`]: crate::NullBuffer #[derive(Debug, Clone, Eq)] pub struct BooleanBuffer { - /// Underlying buffer (byte aligned) + /// Underlying buffer buffer: Buffer, /// Offset in bits (not bytes) bit_offset: usize, @@ -151,7 +149,10 @@ impl BooleanBuffer { /// let result = BooleanBuffer::from_bits(&input, 4, 12); /// assert_eq!(result.values(), &[0b10101100u8, 0b00001011u8]); pub fn from_bits(src: impl AsRef<[u8]>, offset_in_bits: usize, len_in_bits: usize) -> Self { - Self::from_bitwise_unary_op(src, offset_in_bits, len_in_bits, |a| a) + let chunks = BitChunks::new(src.as_ref(), offset_in_bits, len_in_bits); + let iter = chunks.iter_padded(); + let buffer = unsafe { MutableBuffer::from_trusted_len_iter(iter) }; + BooleanBuffer::new(buffer.into(), 0, len_in_bits) } /// Create a new [`BooleanBuffer`] by applying the bitwise operation to `op` @@ -166,7 +167,7 @@ impl BooleanBuffer { /// on the relevant bits; the input `u64` may contain irrelevant bits /// and may be processed differently on different endian architectures. /// * `op` may be called with input bits outside the requested range - /// * The output always has zero offset + /// * The output may have a non-zero bit offset IF the source buffer is not 64-bit aligned. /// /// # See Also /// - [`BooleanBuffer::from_bitwise_binary_op`] to create a new buffer from a binary operation @@ -180,7 +181,8 @@ impl BooleanBuffer { /// let result = BooleanBuffer::from_bitwise_unary_op( /// &input, 0, 12, |a| !a /// ); - /// assert_eq!(result.values(), &[0b00110011u8, 0b11110101u8]); + /// // Values are padded + /// assert_eq!(result.values(), &[0b00110011u8, 0b11110101u8, 255, 255, 255, 255, 255, 255]); /// ``` pub fn from_bitwise_unary_op( src: impl AsRef<[u8]>, @@ -191,67 +193,21 @@ impl BooleanBuffer { where F: FnMut(u64) -> u64, { - // try fast path for aligned input - if offset_in_bits & 0x7 == 0 { - // align to byte boundary - let aligned = &src.as_ref()[offset_in_bits / 8..]; - if let Some(result) = - Self::try_from_aligned_bitwise_unary_op(aligned, len_in_bits, &mut op) - { - return result; - } - } - - let chunks = BitChunks::new(src.as_ref(), offset_in_bits, len_in_bits); - let mut result = MutableBuffer::with_capacity(chunks.num_u64s() * 8); - for chunk in chunks.iter() { - // SAFETY: reserved enough capacity above, (exactly num_u64s() - // items) and we assume `BitChunks` correctly reports upper bound - unsafe { - result.push_unchecked(op(chunk)); - } - } - if chunks.remainder_len() > 0 { - debug_assert!(result.capacity() >= result.len() + 8); // should not reallocate - // SAFETY: reserved enough capacity above, (exactly num_u64s() - // items) and we assume `BitChunks` correctly reports upper bound - unsafe { - result.push_unchecked(op(chunks.remainder_bits())); - } - // Just pushed one u64, which may have trailing zeros - result.truncate(chunks.num_bytes()); - } - - BooleanBuffer { - buffer: Buffer::from(result), - bit_offset: 0, - bit_len: len_in_bits, - } - } - - /// Fast path for [`Self::from_bitwise_unary_op`] when input is aligned to - /// 8-byte (64-bit) boundaries - /// - /// Returns None if the fast path cannot be taken - fn try_from_aligned_bitwise_unary_op( - src: &[u8], - len_in_bits: usize, - op: &mut F, - ) -> Option - where - F: FnMut(u64) -> u64, - { - // Safety: all valid bytes are valid u64s - let (prefix, aligned_u6us, suffix) = unsafe { src.align_to::() }; - if !(prefix.is_empty() && suffix.is_empty()) { - // Couldn't make this case any faster than the default path, see - // https://github.com/apache/arrow-rs/pull/8996/changes#r2620022082 - return None; - } - // the buffer is word (64 bit) aligned, so use optimized Vec code. - let result_u64s: Vec = aligned_u6us.iter().map(|l| op(*l)).collect(); - let buffer = Buffer::from(result_u64s); - Some(BooleanBuffer::new(buffer, 0, len_in_bits)) + let end = offset_in_bits + len_in_bits; + let start_bit = offset_in_bits % 8; + // align to byte boundaries + let aligned = &src.as_ref()[offset_in_bits / 8..bit_util::ceil(end, 8)]; + // Use unaligned code path, handle remainder bytes + let chunks = aligned.chunks_exact(8); + let remainder = chunks.remainder(); + let iter = chunks.map(|c| u64::from_le_bytes(c.try_into().unwrap())); + let vec_u64s: Vec = if remainder.is_empty() { + iter.map(&mut op).collect() + } else { + iter.chain(Some(read_u64(remainder))).map(&mut op).collect() + }; + + return BooleanBuffer::new(Buffer::from(vec_u64s), start_bit, len_in_bits); } /// Create a new [`BooleanBuffer`] by applying the bitwise operation `op` to @@ -276,7 +232,8 @@ impl BooleanBuffer { /// let result = BooleanBuffer::from_bitwise_binary_op( /// &left, 0, &right, 0, 12, |a, b| a & b /// ); - /// assert_eq!(result.inner().as_slice(), &[0b10001000u8, 0b00001000u8]); + /// // Note: the output is padded to the next u64 boundary + /// assert_eq!(result.inner().as_slice(), &[0b10001000u8, 0b00001000u8, 0, 0, 0, 0, 0, 0]); /// ``` /// /// # Example: Create new [`BooleanBuffer`] from bitwise `OR` of two byte slices @@ -288,7 +245,8 @@ impl BooleanBuffer { /// let result = BooleanBuffer::from_bitwise_binary_op( /// &left, 4, &right, 0, 12, |a, b| a | b /// ); - /// assert_eq!(result.inner().as_slice(), &[0b10101110u8, 0b00001111u8]); + /// // Note: the output is padded to the next u64 boundary + /// assert_eq!(result.inner().as_slice(), &[0b10101110u8, 0b00001111u8, 0, 0, 0, 0, 0, 0]); /// ``` pub fn from_bitwise_binary_op( left: impl AsRef<[u8]>, @@ -303,6 +261,15 @@ impl BooleanBuffer { { let left = left.as_ref(); let right = right.as_ref(); + + if left.len() < len_in_bits / 8 { + panic!("The left buffer is too small for the specified length"); + } + + if right.len() < len_in_bits / 8 { + panic!("The right buffer is too small for the specified length"); + } + // try fast path for aligned input // If the underlying buffers are aligned to u64 we can apply the operation directly on the u64 slices // to improve performance. @@ -336,28 +303,60 @@ impl BooleanBuffer { } } } + + if left_offset_in_bits == right_offset_in_bits { + // is aligned to byte boundary + if left_offset_in_bits & 0x7 == 0 { + let left = &left[left_offset_in_bits / 8..]; + let right = &right[right_offset_in_bits / 8..]; + + let left_chunks = BitChunks::new(left, 0, len_in_bits); + let right_chunks = BitChunks::new(right, 0, len_in_bits); + let mut result = Vec::with_capacity(bit_util::ceil(len_in_bits, 64)); + + let l_iter = left + .chunks_exact(8) + .map(|c| u64::from_le_bytes(c.try_into().unwrap())); + let r_iter = right + .chunks_exact(8) + .map(|c| u64::from_le_bytes(c.try_into().unwrap())); + + result.extend(l_iter.zip(r_iter).map(|(l, r)| op(l, r))); + + if left_chunks.remainder_len() > 0 { + result.push(op( + left_chunks.remainder_bits(), + right_chunks.remainder_bits(), + )); + } + return BooleanBuffer::new(Buffer::from(result), 0, len_in_bits); + } + + // both buffers have the same offset, we can use UnalignedBitChunk for both + let left_chunks = UnalignedBitChunk::new(left, left_offset_in_bits, len_in_bits); + let right_chunks = UnalignedBitChunk::new(right, right_offset_in_bits, len_in_bits); + + let chunks = left_chunks + .zip(&right_chunks) + .map(|(left, right)| op(left, right)); + // Soundness: `UnalignedBitChunk` is a `BitChunks` trusted length iterator which + // correctly reports its upper bound + let buffer = unsafe { MutableBuffer::from_trusted_len_iter(chunks) }; + + return BooleanBuffer::new(buffer.into(), left_chunks.lead_padding(), len_in_bits); + } + let left_chunks = BitChunks::new(left, left_offset_in_bits, len_in_bits); let right_chunks = BitChunks::new(right, right_offset_in_bits, len_in_bits); let chunks = left_chunks - .iter() - .zip(right_chunks.iter()) + .zip_padded(&right_chunks) .map(|(left, right)| op(left, right)); // Soundness: `BitChunks` is a `BitChunks` trusted length iterator which // correctly reports its upper bound - let mut buffer = unsafe { MutableBuffer::from_trusted_len_iter(chunks) }; - - let remainder_bytes = bit_util::ceil(left_chunks.remainder_len(), 8); - let rem = op(left_chunks.remainder_bits(), right_chunks.remainder_bits()); - // we are counting its starting from the least significant bit, to to_le_bytes should be correct - let rem = &rem.to_le_bytes()[0..remainder_bytes]; - buffer.extend_from_slice(rem); + let buffer = unsafe { MutableBuffer::from_trusted_len_iter(chunks) }; - BooleanBuffer { - buffer: Buffer::from(buffer), - bit_offset: 0, - bit_len: len_in_bits, - } + BooleanBuffer::new(buffer.into(), 0, len_in_bits) } /// Returns the number of set bits in this buffer @@ -506,11 +505,7 @@ impl Not for &BooleanBuffer { type Output = BooleanBuffer; fn not(self) -> Self::Output { - BooleanBuffer { - buffer: buffer_unary_not(&self.buffer, self.bit_offset, self.bit_len), - bit_offset: 0, - bit_len: self.bit_len, - } + BooleanBuffer::from_bitwise_unary_op(&self.buffer, self.bit_offset, self.bit_len, |a| !a) } } @@ -519,17 +514,14 @@ impl BitAnd<&BooleanBuffer> for &BooleanBuffer { fn bitand(self, rhs: &BooleanBuffer) -> Self::Output { assert_eq!(self.bit_len, rhs.bit_len); - BooleanBuffer { - buffer: buffer_bin_and( - &self.buffer, - self.bit_offset, - &rhs.buffer, - rhs.bit_offset, - self.bit_len, - ), - bit_offset: 0, - bit_len: self.bit_len, - } + BooleanBuffer::from_bitwise_binary_op( + &self.buffer, + self.bit_offset, + &rhs.buffer, + rhs.bit_offset, + self.bit_len, + |a, b| a & b, + ) } } @@ -538,17 +530,14 @@ impl BitOr<&BooleanBuffer> for &BooleanBuffer { fn bitor(self, rhs: &BooleanBuffer) -> Self::Output { assert_eq!(self.bit_len, rhs.bit_len); - BooleanBuffer { - buffer: buffer_bin_or( - &self.buffer, - self.bit_offset, - &rhs.buffer, - rhs.bit_offset, - self.bit_len, - ), - bit_offset: 0, - bit_len: self.bit_len, - } + BooleanBuffer::from_bitwise_binary_op( + &self.buffer, + self.bit_offset, + &rhs.buffer, + rhs.bit_offset, + self.bit_len, + |a, b| a | b, + ) } } @@ -557,17 +546,14 @@ impl BitXor<&BooleanBuffer> for &BooleanBuffer { fn bitxor(self, rhs: &BooleanBuffer) -> Self::Output { assert_eq!(self.bit_len, rhs.bit_len); - BooleanBuffer { - buffer: buffer_bin_xor( - &self.buffer, - self.bit_offset, - &rhs.buffer, - rhs.bit_offset, - self.bit_len, - ), - bit_offset: 0, - bit_len: self.bit_len, - } + BooleanBuffer::from_bitwise_binary_op( + &self.buffer, + self.bit_offset, + &rhs.buffer, + rhs.bit_offset, + self.bit_len, + |a, b| a ^ b, + ) } } diff --git a/arrow-buffer/src/buffer/immutable.rs b/arrow-buffer/src/buffer/immutable.rs index 7bf67503562..b604e8f5428 100644 --- a/arrow-buffer/src/buffer/immutable.rs +++ b/arrow-buffer/src/buffer/immutable.rs @@ -1029,7 +1029,10 @@ mod tests { let assert_preserved = |offset: usize, len: usize| { let new_buf = buf.bit_slice(offset, len); - assert_eq!(new_buf.len(), bit_util::ceil(len, 8)); + assert_eq!( + new_buf.len(), + bit_util::round_upto_multiple_of_64(bit_util::ceil(len, 8)) + ); // if the offset is not byte-aligned, we have to create a deep copy to a new buffer // (since the `offset` value inside a Buffer is byte-granular, not bit-granular), so diff --git a/arrow-buffer/src/buffer/mutable.rs b/arrow-buffer/src/buffer/mutable.rs index 9fc86050619..097a7b35aaa 100644 --- a/arrow-buffer/src/buffer/mutable.rs +++ b/arrow-buffer/src/buffer/mutable.rs @@ -872,15 +872,16 @@ impl MutableBuffer { let mut buffer = MutableBuffer::new(len); - let mut dst = buffer.data.as_ptr(); - for item in iterator { + let mut dst = buffer.data.as_ptr() as *mut T; + iterator.for_each(|item| { // note how there is no reserve here (compared with `extend_from_iter`) - let src = item.to_byte_slice().as_ptr(); - unsafe { std::ptr::copy_nonoverlapping(src, dst, item_size) }; - dst = unsafe { dst.add(item_size) }; - } + unsafe { + std::ptr::write(dst, item); + dst = dst.add(1); + } + }); assert_eq!( - unsafe { dst.offset_from(buffer.data.as_ptr()) } as usize, + unsafe { (dst as *mut u8).offset_from(buffer.data.as_ptr()) } as usize, len, "Trusted iterator length was not accurately reported" ); diff --git a/arrow-buffer/src/buffer/ops.rs b/arrow-buffer/src/buffer/ops.rs index 36efe876432..575651d2e45 100644 --- a/arrow-buffer/src/buffer/ops.rs +++ b/arrow-buffer/src/buffer/ops.rs @@ -77,29 +77,20 @@ pub fn bitwise_bin_op_helper( right: &Buffer, right_offset_in_bits: usize, len_in_bits: usize, - mut op: F, + op: F, ) -> Buffer where F: FnMut(u64, u64) -> u64, { - let left_chunks = left.bit_chunks(left_offset_in_bits, len_in_bits); - let right_chunks = right.bit_chunks(right_offset_in_bits, len_in_bits); - - let chunks = left_chunks - .iter() - .zip(right_chunks.iter()) - .map(|(left, right)| op(left, right)); - // Soundness: `BitChunks` is a `BitChunks` iterator which - // correctly reports its upper bound - let mut buffer = unsafe { MutableBuffer::from_trusted_len_iter(chunks) }; - - let remainder_bytes = ceil(left_chunks.remainder_len(), 8); - let rem = op(left_chunks.remainder_bits(), right_chunks.remainder_bits()); - // we are counting its starting from the least significant bit, to to_le_bytes should be correct - let rem = &rem.to_le_bytes()[0..remainder_bytes]; - buffer.extend_from_slice(rem); - - buffer.into() + BooleanBuffer::from_bitwise_binary_op( + left, + left_offset_in_bits, + right, + right_offset_in_bits, + len_in_bits, + op, + ) + .into_inner() } /// Apply a bitwise operation `op` to one input and return the result as a Buffer. @@ -113,34 +104,13 @@ pub fn bitwise_unary_op_helper( left: &Buffer, offset_in_bits: usize, len_in_bits: usize, - mut op: F, + op: F, ) -> Buffer where F: FnMut(u64) -> u64, { - // reserve capacity and set length so we can get a typed view of u64 chunks - let mut result = - MutableBuffer::new(ceil(len_in_bits, 8)).with_bitset(len_in_bits / 64 * 8, false); - - let left_chunks = left.bit_chunks(offset_in_bits, len_in_bits); - - let result_chunks = result.typed_data_mut::().iter_mut(); - - result_chunks - .zip(left_chunks.iter()) - .for_each(|(res, left)| { - *res = op(left); - }); - - let remainder_bytes = ceil(left_chunks.remainder_len(), 8); - let rem = op(left_chunks.remainder_bits()); - // we are counting its starting from the least significant bit, to to_le_bytes should be correct - let rem = &rem.to_le_bytes()[0..remainder_bytes]; - result.extend_from_slice(rem); - - result.into() + BooleanBuffer::from_bitwise_unary_op(left, offset_in_bits, len_in_bits, op).into_inner() } - /// Apply a bitwise and to two inputs and return the result as a Buffer. /// The inputs are treated as bitmaps, meaning that offsets and length are specified in number of bits. pub fn buffer_bin_and( diff --git a/arrow-buffer/src/util/bit_chunk_iterator.rs b/arrow-buffer/src/util/bit_chunk_iterator.rs index 8c7ec5e9a8f..83f425c5534 100644 --- a/arrow-buffer/src/util/bit_chunk_iterator.rs +++ b/arrow-buffer/src/util/bit_chunk_iterator.rs @@ -17,7 +17,7 @@ //! Types for iterating over bitmasks in 64-bit chunks -use crate::util::bit_util::ceil; +use crate::{bit_util::read_u64, util::bit_util::ceil}; use std::fmt::Debug; /// Iterates over an arbitrarily aligned byte buffer @@ -27,7 +27,7 @@ use std::fmt::Debug; /// /// This is unlike [`BitChunkIterator`] which only exposes a trailing u64, /// and consequently has to perform more work for each read -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] pub struct UnalignedBitChunk<'a> { lead_padding: usize, trailing_padding: usize, @@ -158,30 +158,189 @@ impl<'a> UnalignedBitChunk<'a> { /// Returns an iterator over the chunks pub fn iter(&self) -> UnalignedBitChunkIterator<'a> { - self.prefix - .into_iter() - .chain(self.chunks.iter().cloned()) - .chain(self.suffix) + UnalignedBitChunkIterator { + prefix: self.prefix, + chunks: self.chunks, + suffix: self.suffix, + } + } + + /// Returns a zipped iterator over two [`UnalignedBitChunk`] + #[inline] + pub fn zip(&self, other: &UnalignedBitChunk<'a>) -> UnalignedBitChunkZipIterator<'a> { + UnalignedBitChunkZipIterator { + left: self.iter(), + right: other.iter(), + } } /// Counts the number of ones pub fn count_ones(&self) -> usize { - self.iter().map(|x| x.count_ones() as usize).sum() + let prefix_count = self.prefix.map(|x| x.count_ones() as usize).unwrap_or(0); + let chunks_count: usize = self.chunks.iter().map(|&x| x.count_ones() as usize).sum(); + let suffix_count = self.suffix.map(|x| x.count_ones() as usize).unwrap_or(0); + prefix_count + chunks_count + suffix_count } } -/// Iterator over an [`UnalignedBitChunk`] -pub type UnalignedBitChunkIterator<'a> = std::iter::Chain< - std::iter::Chain, std::iter::Cloned>>, - std::option::IntoIter, ->; +/// An iterator over the chunks of an [`UnalignedBitChunk`] +#[derive(Debug, Clone)] +pub struct UnalignedBitChunkIterator<'a> { + prefix: Option, + chunks: &'a [u64], + suffix: Option, +} -#[inline] -fn read_u64(input: &[u8]) -> u64 { - let len = input.len().min(8); - let mut buf = [0_u8; 8]; - buf[..len].copy_from_slice(input); - u64::from_le_bytes(buf) +impl<'a> Iterator for UnalignedBitChunkIterator<'a> { + type Item = u64; + + #[inline] + fn next(&mut self) -> Option { + if let Some(prefix) = self.prefix.take() { + return Some(prefix); + } + if let Some((&first, rest)) = self.chunks.split_first() { + self.chunks = rest; + return Some(first); + } + self.suffix.take() + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + let len = self.len(); + (len, Some(len)) + } + + #[inline] + fn fold(mut self, init: B, mut f: F) -> B + where + F: FnMut(B, Self::Item) -> B, + { + let mut acc = init; + if let Some(prefix) = self.prefix.take() { + acc = f(acc, prefix); + } + for &chunk in self.chunks { + acc = f(acc, chunk); + } + self.chunks = &[]; + if let Some(suffix) = self.suffix.take() { + acc = f(acc, suffix); + } + acc + } +} + +impl<'a> UnalignedBitChunkIterator<'a> { + /// Returns a zipped iterator over two [`UnalignedBitChunkIterator`] + #[inline] + pub fn zip(self, other: UnalignedBitChunkIterator<'a>) -> UnalignedBitChunkZipIterator<'a> { + UnalignedBitChunkZipIterator { + left: self, + right: other, + } + } +} + +impl ExactSizeIterator for UnalignedBitChunkIterator<'_> { + #[inline] + fn len(&self) -> usize { + self.prefix.is_some() as usize + self.chunks.len() + self.suffix.is_some() as usize + } +} + +impl std::iter::FusedIterator for UnalignedBitChunkIterator<'_> {} + +impl<'a> DoubleEndedIterator for UnalignedBitChunkIterator<'a> { + #[inline] + fn next_back(&mut self) -> Option { + if let Some(suffix) = self.suffix.take() { + return Some(suffix); + } + if let Some((&last, rest)) = self.chunks.split_last() { + self.chunks = rest; + return Some(last); + } + self.prefix.take() + } +} + +/// An iterator over zipped [`UnalignedBitChunk`] +#[derive(Debug)] +pub struct UnalignedBitChunkZipIterator<'a> { + left: UnalignedBitChunkIterator<'a>, + right: UnalignedBitChunkIterator<'a>, +} + +impl<'a> Iterator for UnalignedBitChunkZipIterator<'a> { + type Item = (u64, u64); + + #[inline] + fn next(&mut self) -> Option { + Some((self.left.next()?, self.right.next()?)) + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + self.left.size_hint() + } + + #[inline] + fn fold(mut self, init: B, mut f: F) -> B + where + F: FnMut(B, Self::Item) -> B, + { + let mut acc = init; + + // 1. Consume elements until both are at the 'chunks' stage or one is exhausted. + while self.left.prefix.is_some() || self.right.prefix.is_some() { + if let (Some(l), Some(r)) = (self.left.next(), self.right.next()) { + acc = f(acc, (l, r)); + } else { + return acc; + } + } + + // 2. Now both prefix are None. Zip the chunks. + let chunk_count = self.left.chunks.len().min(self.right.chunks.len()); + if chunk_count > 0 { + let (l_chunks, l_rest) = self.left.chunks.split_at(chunk_count); + let (r_chunks, r_rest) = self.right.chunks.split_at(chunk_count); + + for (&l, &r) in l_chunks.iter().zip(r_chunks.iter()) { + acc = f(acc, (l, r)); + } + + self.left.chunks = l_rest; + self.right.chunks = r_rest; + } + + // 3. Consume remaining (suffix) + while let (Some(l), Some(r)) = (self.left.next(), self.right.next()) { + acc = f(acc, (l, r)); + } + acc + } + + #[inline] + fn for_each(self, mut f: F) + where + F: FnMut(Self::Item), + { + self.fold((), |_, item| f(item)); + } +} + +impl ExactSizeIterator for UnalignedBitChunkZipIterator<'_> {} + +impl std::iter::FusedIterator for UnalignedBitChunkZipIterator<'_> {} + +impl DoubleEndedIterator for UnalignedBitChunkZipIterator<'_> { + #[inline] + fn next_back(&mut self) -> Option { + Some((self.left.next_back()?, self.right.next_back()?)) + } } #[inline] @@ -251,6 +410,17 @@ pub struct BitChunkIterator<'a> { index: usize, } +impl<'a> BitChunkIterator<'a> { + /// Returns a zipped iterator over two [`BitChunkIterator`] + #[inline] + pub fn zip(self, other: BitChunkIterator<'a>) -> BitChunksZipIterator<'a> { + BitChunksZipIterator { + left: self, + right: other, + } + } +} + impl<'a> BitChunks<'a> { /// Returns the number of remaining bits, guaranteed to be between 0 and 63 (inclusive) #[inline] @@ -327,10 +497,197 @@ impl<'a> BitChunks<'a> { /// Returns an iterator over chunks of 64 bits, with the remaining bits zero padded to 64-bits #[inline] pub fn iter_padded(&self) -> impl Iterator + 'a { - self.iter().chain(std::iter::once(self.remainder_bits())) + let remainder = (self.remainder_len > 0).then(|| self.remainder_bits()); + self.iter().chain(remainder) + } + + /// Returns a zipped iterator over two [`BitChunks`] with the remaining bits zero padded to 64-bits + /// + /// # Panics + /// + /// Panics if the chunk lengths are not equal + #[inline] + pub fn zip_padded(&self, other: &BitChunks<'a>) -> impl Iterator + 'a { + assert_eq!(self.remainder_len, other.remainder_len); + let remainder = + (self.remainder_len > 0).then(|| (self.remainder_bits(), other.remainder_bits())); + self.zip(other).chain(remainder) + } + /// Returns a zipped iterator over two [`BitChunks`] + #[inline] + pub fn zip(&self, other: &BitChunks<'a>) -> BitChunksZipIterator<'a> { + assert_eq!(self.chunk_len, other.chunk_len); + BitChunksZipIterator { + left: self.iter(), + right: other.iter(), + } } } +/// An iterator over zipped chunks of 64 bits +#[derive(Debug)] +pub struct BitChunksZipIterator<'a> { + left: BitChunkIterator<'a>, + right: BitChunkIterator<'a>, +} + +impl Iterator for BitChunksZipIterator<'_> { + type Item = (u64, u64); + + #[inline] + fn next(&mut self) -> Option { + Some((self.left.next()?, self.right.next()?)) + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + self.left.size_hint() + } + + #[inline] + fn fold(self, init: B, mut f: F) -> B + where + F: FnMut(B, Self::Item) -> B, + { + let mut acc = init; + let chunk_len = self.left.chunk_len; + let index = self.left.index; + if index >= chunk_len { + return acc; + } + + let left_data = self.left.buffer.as_ptr() as *const u64; + let right_data = self.right.buffer.as_ptr() as *const u64; + + let l_off = self.left.bit_offset; + let r_off = self.right.bit_offset; + + match (l_off, r_off) { + (0, 0) => { + for i in index..chunk_len { + unsafe { + let l = std::ptr::read_unaligned(left_data.add(i)).to_le(); + let r = std::ptr::read_unaligned(right_data.add(i)).to_le(); + acc = f(acc, (l, r)); + } + } + } + (0, r_off) => { + let r_shift_high = 64 - r_off; + if self.right.buffer.len() >= (chunk_len + 1) * 8 { + let mut r_curr = + unsafe { std::ptr::read_unaligned(right_data.add(index)).to_le() }; + for i in index..chunk_len { + unsafe { + let l = std::ptr::read_unaligned(left_data.add(i)).to_le(); + let r_next = std::ptr::read_unaligned(right_data.add(i + 1)).to_le(); + let r = (r_curr >> r_off) | (r_next << r_shift_high); + acc = f(acc, (l, r)); + r_curr = r_next; + } + } + } else { + for i in index..chunk_len { + unsafe { + let l = std::ptr::read_unaligned(left_data.add(i)).to_le(); + let r_low = std::ptr::read_unaligned(right_data.add(i)).to_le(); + let r_high = + std::ptr::read_unaligned(right_data.add(i + 1) as *const u8) as u64; + let r = (r_low >> r_off) | (r_high << r_shift_high); + acc = f(acc, (l, r)); + } + } + } + } + (l_off, 0) => { + let l_shift_high = 64 - l_off; + if self.left.buffer.len() >= (chunk_len + 1) * 8 { + let mut l_curr = + unsafe { std::ptr::read_unaligned(left_data.add(index)).to_le() }; + for i in index..chunk_len { + unsafe { + let l_next = std::ptr::read_unaligned(left_data.add(i + 1)).to_le(); + let l = (l_curr >> l_off) | (l_next << l_shift_high); + let r = std::ptr::read_unaligned(right_data.add(i)).to_le(); + acc = f(acc, (l, r)); + l_curr = l_next; + } + } + } else { + for i in index..chunk_len { + unsafe { + let l_low = std::ptr::read_unaligned(left_data.add(i)).to_le(); + let l_high = + std::ptr::read_unaligned(left_data.add(i + 1) as *const u8) as u64; + let l = (l_low >> l_off) | (l_high << l_shift_high); + let r = std::ptr::read_unaligned(right_data.add(i)).to_le(); + acc = f(acc, (l, r)); + } + } + } + } + (l_off, r_off) => { + let l_shift_high = 64 - l_off; + let r_shift_high = 64 - r_off; + + // We can use a faster sliding window if we have padding + // Arrow buffers usually have 64-byte padding + let l_padded = self.left.buffer.len() >= (chunk_len + 1) * 8; + let r_padded = self.right.buffer.len() >= (chunk_len + 1) * 8; + + if l_padded && r_padded { + let mut l_curr = + unsafe { std::ptr::read_unaligned(left_data.add(index)).to_le() }; + let mut r_curr = + unsafe { std::ptr::read_unaligned(right_data.add(index)).to_le() }; + + for i in index..chunk_len { + unsafe { + let l_next = std::ptr::read_unaligned(left_data.add(i + 1)).to_le(); + let r_next = std::ptr::read_unaligned(right_data.add(i + 1)).to_le(); + + let l = (l_curr >> l_off) | (l_next << l_shift_high); + let r = (r_curr >> r_off) | (r_next << r_shift_high); + + acc = f(acc, (l, r)); + l_curr = l_next; + r_curr = r_next; + } + } + } else { + // Fallback to safe but slower byte reads for high bits + for i in index..chunk_len { + unsafe { + let l_low = std::ptr::read_unaligned(left_data.add(i)).to_le(); + let l_high = + std::ptr::read_unaligned(left_data.add(i + 1) as *const u8) as u64; + let l = (l_low >> l_off) | (l_high << l_shift_high); + + let r_low = std::ptr::read_unaligned(right_data.add(i)).to_le(); + let r_high = + std::ptr::read_unaligned(right_data.add(i + 1) as *const u8) as u64; + let r = (r_low >> r_off) | (r_high << r_shift_high); + + acc = f(acc, (l, r)); + } + } + } + } + } + acc + } + + #[inline] + fn for_each(self, mut f: F) + where + F: FnMut(Self::Item), + { + self.fold((), |_, item| f(item)); + } +} + +impl ExactSizeIterator for BitChunksZipIterator<'_> {} + impl<'a> IntoIterator for BitChunks<'a> { type Item = u64; type IntoIter = BitChunkIterator<'a>; @@ -378,10 +735,63 @@ impl Iterator for BitChunkIterator<'_> { #[inline] fn size_hint(&self) -> (usize, Option) { - ( - self.chunk_len - self.index, - Some(self.chunk_len - self.index), - ) + let len = self.chunk_len - self.index; + (len, Some(len)) + } + + #[inline] + fn fold(self, init: B, mut f: F) -> B + where + F: FnMut(B, Self::Item) -> B, + { + let mut acc = init; + let chunk_len = self.chunk_len; + let index = self.index; + if index >= chunk_len { + return acc; + } + + let data = self.buffer.as_ptr() as *const u64; + let bit_offset = self.bit_offset; + + if bit_offset == 0 { + for i in index..chunk_len { + let v = unsafe { std::ptr::read_unaligned(data.add(i)).to_le() }; + acc = f(acc, v); + } + } else { + let shift_high = 64 - bit_offset; + // Use sliding window if padded + if self.buffer.len() >= (chunk_len + 1) * 8 { + let mut curr = unsafe { std::ptr::read_unaligned(data.add(index)).to_le() }; + for i in index..chunk_len { + unsafe { + let next = std::ptr::read_unaligned(data.add(i + 1)).to_le(); + let v = (curr >> bit_offset) | (next << shift_high); + acc = f(acc, v); + curr = next; + } + } + } else { + for i in index..chunk_len { + unsafe { + let low = std::ptr::read_unaligned(data.add(i)).to_le(); + let high = std::ptr::read_unaligned(data.add(i + 1) as *const u8) as u64; + let v = (low >> bit_offset) | (high << shift_high); + acc = f(acc, v); + } + } + } + } + acc + } + + #[inline] + fn for_each(self, mut f: F) + where + F: FnMut(Self::Item), + { + self.fold((), |_, item| f(item)); } } @@ -400,7 +810,7 @@ mod tests { use rand::rng; use crate::buffer::Buffer; - use crate::util::bit_chunk_iterator::UnalignedBitChunk; + use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk}; #[test] fn test_iter_aligned() { @@ -698,6 +1108,129 @@ mod tests { assert_eq!(unaligned.trailing_padding(), 62); } + #[test] + fn test_fold() { + let input: &[u8] = &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]; + let bitchunks = BitChunks::new(input, 0, 128); + let result = bitchunks.iter().fold(Vec::new(), |mut acc, x| { + acc.push(x); + acc + }); + assert_eq!(vec![0x0706050403020100, 0x0f0e0d0c0b0a0908], result); + + let bitchunks = BitChunks::new(input, 4, 64); + let result = bitchunks.iter().fold(Vec::new(), |mut acc, x| { + acc.push(x); + acc + }); + // 0x080706050403020100 + // offset 4 bits + let expected = (0x080706050403020100_u128 >> 4) as u64; + assert_eq!(vec![expected], result); + } + + #[test] + fn test_fold_zip() { + let left: &[u8] = &[0, 1, 2, 3, 4, 5, 6, 7]; + let right: &[u8] = &[1, 2, 3, 4, 5, 6, 7, 8]; + let left_chunks = BitChunks::new(left, 0, 64); + let right_chunks = BitChunks::new(right, 0, 64); + + let result = + left_chunks + .iter() + .zip(right_chunks.iter()) + .fold(Vec::new(), |mut acc, (l, r)| { + acc.push(l ^ r); + acc + }); + assert_eq!(vec![0x0706050403020100 ^ 0x0807060504030201], result); + + let _left_chunks = BitChunks::new(left, 4, 32); + let _right_chunks = BitChunks::new(right, 8, 32); + // chunk_len is 0 for 32 bits. + // let's try 64 bits with offset + let left: &[u8] = &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; + let right: &[u8] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + let left_chunks = BitChunks::new(left, 4, 64); + let right_chunks = BitChunks::new(right, 8, 64); + + let result = + left_chunks + .iter() + .zip(right_chunks.iter()) + .fold(Vec::new(), |mut acc, (l, r)| { + acc.push(l & r); + acc + }); + + let l_expected = { + let mut b = [0u8; 16]; + b[0..9].copy_from_slice(&left[0..9]); + u128::from_le_bytes(b) >> 4 + } as u64; + let r_expected = { + let mut b = [0u8; 16]; + b[0..9].copy_from_slice(&right[0..9]); + u128::from_le_bytes(b) >> 8 + } as u64; + assert_eq!(vec![l_expected & r_expected], result); + } + + #[test] + fn test_unaligned_fold_zip() { + let left: &[u8] = &[ + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, + ]; + let right: &[u8] = &[ + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, + ]; + + let cases = [ + (0, 0, 128), + (4, 4, 64), + (4, 8, 64), + (0, 8, 128), + (1, 2, 128), + (7, 0, 128), + ]; + + for (l_off, r_off, len) in cases { + let left_bc = UnalignedBitChunk::new(left, l_off, len); + let right_bc = UnalignedBitChunk::new(right, r_off, len); + + let expected: Vec<_> = left_bc.iter().zip(right_bc.iter()).collect(); + + // Test fold + let actual: Vec<_> = left_bc.zip(&right_bc).fold(Vec::new(), |mut acc, x| { + acc.push(x); + acc + }); + assert_eq!( + expected, actual, + "Fold failed for l_off={}, r_off={}, len={}", + l_off, r_off, len + ); + + // Test for_each + let mut actual_for_each = Vec::new(); + left_bc.zip(&right_bc).for_each(|x| actual_for_each.push(x)); + assert_eq!( + expected, actual_for_each, + "ForEach failed for l_off={}, r_off={}, len={}", + l_off, r_off, len + ); + + // Test next() + let actual_next: Vec<_> = left_bc.zip(&right_bc).collect(); + assert_eq!( + expected, actual_next, + "Next failed for l_off={}, r_off={}, len={}", + l_off, r_off, len + ); + } + } + #[test] #[cfg_attr(miri, ignore)] fn fuzz_unaligned_bit_chunk_iterator() { diff --git a/arrow-buffer/src/util/bit_util.rs b/arrow-buffer/src/util/bit_util.rs index 67c72fc0890..cc9e8cd5fbd 100644 --- a/arrow-buffer/src/util/bit_util.rs +++ b/arrow-buffer/src/util/bit_util.rs @@ -90,6 +90,15 @@ pub unsafe fn unset_bit_raw(data: *mut u8, i: usize) { } } +/// Read a u64 from a byte slice, padding with zeros if necessary +#[inline] +pub fn read_u64(input: &[u8]) -> u64 { + let len = input.len().min(8); + let mut buf = [0_u8; 8]; + buf[..len].copy_from_slice(input); + u64::from_le_bytes(buf) +} + /// Returns the ceil of `value`/`divisor` #[inline] pub fn ceil(value: usize, divisor: usize) -> usize { diff --git a/arrow-select/src/nullif.rs b/arrow-select/src/nullif.rs index fa875c20e30..3a9f118560f 100644 --- a/arrow-select/src/nullif.rs +++ b/arrow-select/src/nullif.rs @@ -90,13 +90,10 @@ pub fn nullif(left: &dyn Array, right: &BooleanArray) -> Result { - let mut null_count = 0; - let buffer = bitwise_unary_op_helper(right.inner(), right.offset(), len, |b| { - let t = !b; - null_count += t.count_zeros() as usize; - t - }); - (buffer, null_count) + let buffer: arrow_buffer::Buffer = + bitwise_unary_op_helper(right.inner(), right.offset(), len, |r| !r); + let true_count = buffer.count_set_bits_offset(0, len); + (buffer, len - true_count) } }; diff --git a/parquet/src/arrow/buffer/bit_util.rs b/parquet/src/arrow/buffer/bit_util.rs index 985943b851a..96a6baa5ba6 100644 --- a/parquet/src/arrow/buffer/bit_util.rs +++ b/parquet/src/arrow/buffer/bit_util.rs @@ -30,11 +30,7 @@ pub fn iter_set_bits_rev(bytes: &[u8]) -> impl Iterator + '_ { let unaligned = UnalignedBitChunk::new(bytes, 0, bit_length); let mut chunk_end_idx = bit_length + unaligned.lead_padding() + unaligned.trailing_padding(); - let iter = unaligned - .prefix() - .into_iter() - .chain(unaligned.chunks().iter().cloned()) - .chain(unaligned.suffix()); + let iter = unaligned.iter(); iter.rev().flat_map(move |chunk| { let chunk_idx = chunk_end_idx - 64;