Skip to content

Commit 40ef4aa

Browse files
authored
Fix DeltaBitPack MiniBlock Bit Width Padding (#1418)
* Consistent DeltaBitPackEncoder bit width padding (#1416) Ignore non-zero padded bit widths in DeltaBitPackDecoder (#1417) * chore: review feedback * Add test of DeltaBitPackDecoder padding * Revert formatting
1 parent 729934c commit 40ef4aa

2 files changed

Lines changed: 100 additions & 5 deletions

File tree

parquet/src/encodings/decoding.rs

Lines changed: 90 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,7 @@ where
639639
self.last_value = value;
640640
buffer[0] = value;
641641
read += 1;
642+
self.values_left -= 1;
642643
}
643644

644645
while read != to_read {
@@ -653,6 +654,14 @@ where
653654
.bit_reader
654655
.get_batch(&mut buffer[read..read + batch_to_read], bit_width);
655656

657+
if batch_read != batch_to_read {
658+
return Err(general_err!(
659+
"Expected to read {} values from miniblock got {}",
660+
batch_to_read,
661+
batch_read
662+
));
663+
}
664+
656665
// At this point we have read the deltas to `buffer` we now need to offset
657666
// these to get back to the original values that were encoded
658667
for v in &mut buffer[read..read + batch_read] {
@@ -668,9 +677,9 @@ where
668677

669678
read += batch_read;
670679
self.mini_block_remaining -= batch_read;
680+
self.values_left -= batch_read;
671681
}
672682

673-
self.values_left -= to_read;
674683
Ok(to_read)
675684
}
676685

@@ -928,7 +937,9 @@ mod tests {
928937
ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType,
929938
};
930939
use crate::util::{
931-
bit_util::set_array_bit, memory::MemTracker, test_common::RandGen,
940+
bit_util::set_array_bit,
941+
memory::{BufferPtr, MemTracker},
942+
test_common::RandGen,
932943
};
933944

934945
#[test]
@@ -1326,6 +1337,83 @@ mod tests {
13261337
assert_eq!(result, vec![29, 43, 89]);
13271338
}
13281339

1340+
#[test]
1341+
fn test_delta_bit_packed_padding() {
1342+
// Page header
1343+
let header = vec![
1344+
// Page Header
1345+
1346+
// Block Size - 256
1347+
128,
1348+
2,
1349+
// Miniblocks in block,
1350+
4,
1351+
// Total value count - 419
1352+
128 + 35,
1353+
3,
1354+
// First value - 7
1355+
7,
1356+
];
1357+
1358+
// Block Header
1359+
let block1_header = vec![
1360+
0, // Min delta
1361+
0, 1, 0, 0, // Bit widths
1362+
];
1363+
1364+
// Mini-block 1 - bit width 0 => 0 bytes
1365+
// Mini-block 2 - bit width 1 => 8 bytes
1366+
// Mini-block 3 - bit width 0 => 0 bytes
1367+
// Mini-block 4 - bit width 0 => 0 bytes
1368+
let block1 = vec![0xFF; 8];
1369+
1370+
// Block Header
1371+
let block2_header = vec![
1372+
0, // Min delta
1373+
0, 1, 2, 0xFF, // Bit widths, including non-zero padding
1374+
];
1375+
1376+
// Mini-block 1 - bit width 0 => 0 bytes
1377+
// Mini-block 2 - bit width 1 => 8 bytes
1378+
// Mini-block 3 - bit width 2 => 16 bytes
1379+
// Mini-block 4 - padding => no bytes
1380+
let block2 = vec![0xFF; 24];
1381+
1382+
let data: Vec<u8> = header
1383+
.into_iter()
1384+
.chain(block1_header)
1385+
.chain(block1)
1386+
.chain(block2_header)
1387+
.chain(block2)
1388+
.collect();
1389+
1390+
let length = data.len();
1391+
1392+
let ptr = BufferPtr::new(data);
1393+
let mut reader = BitReader::new(ptr.clone());
1394+
assert_eq!(reader.get_vlq_int().unwrap(), 256);
1395+
assert_eq!(reader.get_vlq_int().unwrap(), 4);
1396+
assert_eq!(reader.get_vlq_int().unwrap(), 419);
1397+
assert_eq!(reader.get_vlq_int().unwrap(), 7);
1398+
1399+
// Test output buffer larger than needed and not exact multiple of block size
1400+
let mut output = vec![0_i32; 420];
1401+
1402+
let mut decoder = DeltaBitPackDecoder::<Int32Type>::new();
1403+
decoder.set_data(ptr.clone(), 0).unwrap();
1404+
assert_eq!(decoder.get(&mut output).unwrap(), 419);
1405+
assert_eq!(decoder.get_offset(), length);
1406+
1407+
// Test with truncated buffer
1408+
decoder.set_data(ptr.range(0, 12), 0).unwrap();
1409+
let err = decoder.get(&mut output).unwrap_err().to_string();
1410+
assert!(
1411+
err.contains("Expected to read 64 values from miniblock got 8"),
1412+
"{}",
1413+
err
1414+
);
1415+
}
1416+
13291417
#[test]
13301418
fn test_delta_byte_array_same_arrays() {
13311419
let data = vec![

parquet/src/encodings/encoding.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -565,7 +565,7 @@ impl<T: DataType> DeltaBitPackEncoder<T> {
565565
return Ok(());
566566
}
567567

568-
let mut min_delta = i64::max_value();
568+
let mut min_delta = i64::MAX;
569569
for i in 0..self.values_in_block {
570570
min_delta = cmp::min(min_delta, self.deltas[i]);
571571
}
@@ -581,6 +581,13 @@ impl<T: DataType> DeltaBitPackEncoder<T> {
581581
// values left
582582
let n = cmp::min(self.mini_block_size, self.values_in_block);
583583
if n == 0 {
584+
// Decoders should be agnostic to the padding value, we therefore use 0xFF
585+
// when running tests. However, not all implementations may handle this correctly
586+
// so pad with 0 when not running tests
587+
let pad_value = cfg!(test).then(|| 0xFF).unwrap_or(0);
588+
for j in i..self.num_mini_blocks {
589+
self.bit_writer.write_at(offset + j, pad_value);
590+
}
584591
break;
585592
}
586593

@@ -610,8 +617,8 @@ impl<T: DataType> DeltaBitPackEncoder<T> {
610617
self.values_in_block -= n;
611618
}
612619

613-
assert!(
614-
self.values_in_block == 0,
620+
assert_eq!(
621+
self.values_in_block, 0,
615622
"Expected 0 values in block, found {}",
616623
self.values_in_block
617624
);

0 commit comments

Comments
 (0)