Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 27 additions & 99 deletions parquet-variant-compute/src/from_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,109 +18,42 @@
//! Module for transforming a batch of JSON strings into a batch of Variants represented as
//! STRUCT<metadata: BINARY, value: BINARY>

use std::sync::Arc;

use arrow::array::{Array, ArrayRef, BinaryArray, BooleanBufferBuilder, StringArray, StructArray};
use arrow::buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer};
use arrow::datatypes::{DataType, Field};
use crate::{VariantArray, VariantArrayBuilder};
use arrow::array::{Array, ArrayRef, StringArray};
use arrow_schema::ArrowError;
use parquet_variant::VariantBuilder;
use parquet_variant_json::json_to_variant;

fn variant_arrow_repr() -> DataType {
// The subfields are expected to be non-nullable according to the parquet variant spec.
let metadata_field = Field::new("metadata", DataType::Binary, false);
let value_field = Field::new("value", DataType::Binary, false);
let fields = vec![metadata_field, value_field];
DataType::Struct(fields.into())
}

/// Parse a batch of JSON strings into a batch of Variants represented as
/// STRUCT<metadata: BINARY, value: BINARY> where nulls are preserved. The JSON strings in the input
/// must be valid.
pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result<StructArray, ArrowError> {
pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result<VariantArray, ArrowError> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change 1: a new VariantArray that wraps a StructArray and does basic validation

let input_string_array = match input.as_any().downcast_ref::<StringArray>() {
Some(string_array) => Ok(string_array),
None => Err(ArrowError::CastError(
"Expected reference to StringArray as input".into(),
)),
}?;

// Zero-copy builders
let mut metadata_buffer: Vec<u8> = Vec::with_capacity(input.len() * 128);
let mut metadata_offsets: Vec<i32> = Vec::with_capacity(input.len() + 1);
let mut metadata_validity = BooleanBufferBuilder::new(input.len());
let mut metadata_current_offset: i32 = 0;
metadata_offsets.push(metadata_current_offset);

let mut value_buffer: Vec<u8> = Vec::with_capacity(input.len() * 128);
let mut value_offsets: Vec<i32> = Vec::with_capacity(input.len() + 1);
let mut value_validity = BooleanBufferBuilder::new(input.len());
let mut value_current_offset: i32 = 0;
value_offsets.push(value_current_offset);

let mut validity = BooleanBufferBuilder::new(input.len());
let mut variant_array_builder = VariantArrayBuilder::new(input_string_array.len());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR refactors the details of creating the output arrays into VariantArrayBuilder so it can be reused

for i in 0..input.len() {
if input.is_null(i) {
// The subfields are expected to be non-nullable according to the parquet variant spec.
metadata_validity.append(true);
value_validity.append(true);
metadata_offsets.push(metadata_current_offset);
value_offsets.push(value_current_offset);
validity.append(false);
variant_array_builder.append_null();
} else {
let mut vb = VariantBuilder::new();
json_to_variant(input_string_array.value(i), &mut vb)?;
let (metadata, value) = vb.finish();
validity.append(true);

metadata_current_offset += metadata.len() as i32;
metadata_buffer.extend(metadata);
metadata_offsets.push(metadata_current_offset);
metadata_validity.append(true);

value_current_offset += value.len() as i32;
value_buffer.extend(value);
value_offsets.push(value_current_offset);
value_validity.append(true);
variant_array_builder.append_variant_buffers(&metadata, &value);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have ideas on how to make this more efficient by writing directly into the target buffers. I will work on this in a follow on PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}
let metadata_offsets_buffer = OffsetBuffer::new(ScalarBuffer::from(metadata_offsets));
let metadata_data_buffer = Buffer::from_vec(metadata_buffer);
let metadata_null_buffer = NullBuffer::new(metadata_validity.finish());

let value_offsets_buffer = OffsetBuffer::new(ScalarBuffer::from(value_offsets));
let value_data_buffer = Buffer::from_vec(value_buffer);
let value_null_buffer = NullBuffer::new(value_validity.finish());

let metadata_array = BinaryArray::new(
metadata_offsets_buffer,
metadata_data_buffer,
Some(metadata_null_buffer),
);
let value_array = BinaryArray::new(
value_offsets_buffer,
value_data_buffer,
Some(value_null_buffer),
);

let struct_fields: Vec<ArrayRef> = vec![Arc::new(metadata_array), Arc::new(value_array)];
let variant_fields = match variant_arrow_repr() {
DataType::Struct(fields) => fields,
_ => unreachable!("variant_arrow_repr is hard-coded and must match the expected schema"),
};
let null_buffer = NullBuffer::new(validity.finish());
Ok(StructArray::new(
variant_fields,
struct_fields,
Some(null_buffer),
))
Ok(variant_array_builder.build())
}

#[cfg(test)]
mod test {
use crate::batch_json_string_to_variant;
use arrow::array::{Array, ArrayRef, BinaryArray, StringArray};
use arrow::array::{Array, ArrayRef, AsArray, StringArray};
use arrow_schema::ArrowError;
use parquet_variant::{Variant, VariantBuilder};
use std::sync::Arc;
Expand All @@ -135,43 +68,38 @@ mod test {
None,
]);
let array_ref: ArrayRef = Arc::new(input);
let output = batch_json_string_to_variant(&array_ref).unwrap();
let variant_array = batch_json_string_to_variant(&array_ref).unwrap();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test uses the new VariantArray APIs to access the relevant fields / value / etc


let struct_array = &output;
let metadata_array = struct_array
.column(0)
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap();
let value_array = struct_array
.column(1)
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap();
let metadata_array = variant_array.metadata_field().as_binary_view();
let value_array = variant_array.value_field().as_binary_view();

assert!(!struct_array.is_null(0));
assert!(struct_array.is_null(1));
assert!(!struct_array.is_null(2));
assert!(!struct_array.is_null(3));
assert!(struct_array.is_null(4));
// Compare row 0
assert!(!variant_array.is_null(0));
assert_eq!(variant_array.value(0), Variant::Int8(1));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think VariantArray::value() that returns Variant is particularly nice to use :bowtie:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's also how the Go VariantArray works as well, Value() on the VariantArray returns a Variant (performing the consolidation if the underlying VariantArray is shredded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't fully worked out in my head how this will work for shredded Variants -- specifically how to avoid copying stuff around when accessing a shredded variant. We'll have to figure that out in follow on PRs


assert_eq!(metadata_array.value(0), &[1, 0, 0]);
assert_eq!(value_array.value(0), &[12, 1]);
// Compare row 1
assert!(variant_array.is_null(1));

// Compare row 2
assert!(!variant_array.is_null(2));
{
let mut vb = VariantBuilder::new();
let mut ob = vb.new_object();
ob.insert("a", Variant::Int8(32));
ob.finish()?;
let (object_metadata, object_value) = vb.finish();
assert_eq!(metadata_array.value(2), &object_metadata);
assert_eq!(value_array.value(2), &object_value);
let expected = Variant::new(&object_metadata, &object_value);
assert_eq!(variant_array.value(2), expected);
}

assert_eq!(metadata_array.value(3), &[1, 0, 0]);
assert_eq!(value_array.value(3), &[0]);
// Compare row 3 (Note this is a variant NULL, not a null row)
assert!(!variant_array.is_null(3));
assert_eq!(variant_array.value(3), Variant::Null);

// Compare row 4
assert!(variant_array.is_null(4));

// Ensure that the subfields are not actually nullable
// Ensure that the subfields are not nullable
assert!(!metadata_array.is_null(1));
assert!(!value_array.is_null(1));
assert!(!metadata_array.is_null(4));
Expand Down
5 changes: 5 additions & 0 deletions parquet-variant-compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

mod from_json;
mod to_json;
mod variant_array;
mod variant_array_builder;

pub use variant_array::VariantArray;
pub use variant_array_builder::VariantArrayBuilder;

pub use from_json::batch_json_string_to_variant;
pub use to_json::batch_variant_to_json_string;
Loading
Loading