Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/moonlink_backend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,9 +527,9 @@ mod tests {
// Validate metadatas are returned in the correct order.
assert_eq!(parquet_metadatas.len(), 2);
let metadata_1 = deserialize_parquet_metadata(&parquet_metadatas[0][..]);
assert_eq!(metadata_1.num_rows, 5);
assert_eq!(metadata_1.file_metadata().num_rows(), 5);
let metadata_2 = deserialize_parquet_metadata(&parquet_metadatas[1][..]);
assert_eq!(metadata_2.num_rows, 2);
assert_eq!(metadata_2.file_metadata().num_rows(), 2);
}

#[tokio::test]
Expand Down
82 changes: 36 additions & 46 deletions src/moonlink_backend/src/parquet_utils.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use crate::error::{Error, Result};

use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use std::io::SeekFrom;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeekExt;

#[cfg(test)]
use parquet::format::FileMetaData;

/// Parquet file footer size.
const FOOTER_SIZE: u64 = 8;
/// Parquet file magic bytes ("PAR1").
Expand Down Expand Up @@ -59,40 +56,25 @@ pub(crate) async fn get_parquet_serialized_metadata(filepath: &str) -> Result<Ve
Ok(buf)
}

#[cfg(test)]
pub(crate) fn deserialize_parquet_metadata(bytes: &[u8]) -> FileMetaData {
use parquet::thrift::TSerializable;
use thrift::protocol::TCompactInputProtocol;
use thrift::transport::TBufferChannel;

let mut chan = TBufferChannel::with_capacity(bytes.len(), /*write_capacity=*/ 0);
chan.set_readable_bytes(bytes);
let mut proto = TCompactInputProtocol::new(chan);
FileMetaData::read_from_in_protocol(&mut proto).unwrap()
#[allow(dead_code)]
pub(crate) fn deserialize_parquet_metadata(bytes: &[u8]) -> ParquetMetaData {
ParquetMetaDataReader::decode_metadata(bytes).expect("Failed to decode metadata")
}

#[cfg(test)]
mod tests {
use super::*;
use std::fs::File as StdFile;

use arrow_array::{Int32Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use parquet::arrow::arrow_writer::ArrowWriter;
use parquet::format::{FileMetaData, Statistics};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::statistics::Statistics;
use std::fs::File as StdFile;
use tempfile::tempdir;

// Util function to get min and max.
fn stats_min_max_i32(stats: &Statistics) -> Option<(i32, i32)> {
let min_bytes = stats.min_value.as_ref().or(stats.min.as_ref())?;
let max_bytes = stats.max_value.as_ref().or(stats.max.as_ref())?;

if min_bytes.len() != 4 || max_bytes.len() != 4 {
return None;
}
let min = i32::from_le_bytes([min_bytes[0], min_bytes[1], min_bytes[2], min_bytes[3]]);
let max = i32::from_le_bytes([max_bytes[0], max_bytes[1], max_bytes[2], max_bytes[3]]);
Some((min, max))
// // Util function to convert bytes to i32
fn bytes_to_i32(bytes: &[u8]) -> i32 {
let arr: [u8; 4] = bytes.try_into().expect("slice with incorrect length");
i32::from_le_bytes(arr)
}

#[tokio::test]
Expand All @@ -118,23 +100,31 @@ mod tests {
let buf = get_parquet_serialized_metadata(&parquet_path)
.await
.unwrap();
let file_md: FileMetaData = deserialize_parquet_metadata(&buf[..]);

assert_eq!(file_md.num_rows, 5);
assert_eq!(file_md.row_groups.len(), 1);
let rg = &file_md.row_groups[0];
assert_eq!(rg.columns.len(), 1);
let col = &rg.columns[0];
let meta = col.meta_data.as_ref().unwrap();
let stats = meta.statistics.as_ref().unwrap();
if let Some(nulls) = stats.null_count {
assert_eq!(nulls, 1);
} else {
panic!("expected null_count in column statistics");
let file_md: ParquetMetaData = deserialize_parquet_metadata(&buf[..]);

assert_eq!(file_md.file_metadata().num_rows(), 5);
assert_eq!(file_md.row_groups().len(), 1);
assert_eq!(file_md.row_groups()[0].num_columns(), 1);
let col_chunk: &Statistics = file_md
.row_group(0)
.column(0)
.statistics()
.expect("expected statistics");

match *col_chunk {
Statistics::Int32(ref stat) => {
if let Some(min_bytes) = stat.min_bytes_opt() {
let min_val = bytes_to_i32(min_bytes);
assert_eq!(min_val, 1);
}
if let Some(max_bytes) = stat.max_bytes_opt() {
let max_val = bytes_to_i32(max_bytes);
assert_eq!(max_val, 5);
}
}
_ => panic!("expected int32 statistics"),
}

let (min, max) = stats_min_max_i32(stats).unwrap();
assert_eq!(min, 1);
assert_eq!(max, 5);
let null_count_opt = col_chunk.null_count_opt().unwrap();
assert_eq!(null_count_opt, 1);
}
}