From e0f0c16208cfc3d12134deb257f9ac43405b8562 Mon Sep 17 00:00:00 2001 From: albertlockett Date: Wed, 27 Aug 2025 19:56:14 -0400 Subject: [PATCH 1/7] add optional column handling to parquet exporter --- .../crates/otap/src/parquet_exporter.rs | 1 + .../otap/src/parquet_exporter/schema.rs | 145 ++++++++++++++++++ 2 files changed, 146 insertions(+) create mode 100644 rust/otap-dataflow/crates/otap/src/parquet_exporter/schema.rs diff --git a/rust/otap-dataflow/crates/otap/src/parquet_exporter.rs b/rust/otap-dataflow/crates/otap/src/parquet_exporter.rs index 1192b42a5..23cd8c8b6 100644 --- a/rust/otap-dataflow/crates/otap/src/parquet_exporter.rs +++ b/rust/otap-dataflow/crates/otap/src/parquet_exporter.rs @@ -48,6 +48,7 @@ mod config; mod idgen; mod object_store; mod partition; +mod schema; mod writer; #[allow(dead_code)] diff --git a/rust/otap-dataflow/crates/otap/src/parquet_exporter/schema.rs b/rust/otap-dataflow/crates/otap/src/parquet_exporter/schema.rs new file mode 100644 index 000000000..c925c6229 --- /dev/null +++ b/rust/otap-dataflow/crates/otap/src/parquet_exporter/schema.rs @@ -0,0 +1,145 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! This module contains code for manipulating the arrow record batch [`RecordBatch`] so that it +//! has a schema that's compatible with the Parquet writer. +//! +//! The main reason we can't write the record batch verbatim is because OTAP considers some columns +//! optional, but parquet spec requires that each row group contain a column chunk for every field +//! in the schema. This means we can't receive two consecutive OTAP batches for some payload type +//! and write them into the same writer. +//! +//! Note that although we also switch between the Dictionary and Native encodings, we don't need to +//! actually convert the existing columns to all be the same type. Parquet writer is able to accept +//! logically compatible types, which includes compatibility between types like `T` (native array) +//! and `Dictionary` (for any `K`). +//! +//! TODO do we also need to handle the mixed-up column order? +//! +//! To handle this, we need to add all-null columns for all the optional columns. + +use std::sync::{Arc, LazyLock}; + +use arrow::array::{ + ArrayRef, BinaryArray, FixedSizeBinaryArray, Float32Array, Float64Array, Int64Array, + RecordBatch, StringArray, TimestampNanosecondArray, UInt8Array, UInt16Array, UInt32Array, + UInt64Array, +}; +use arrow::datatypes::{DataType, Field, Fields, Schema, TimeUnit}; +use otap_df_engine::error::Error; +use otel_arrow_rust::proto::opentelemetry::arrow::v1::ArrowPayloadType; +use otel_arrow_rust::schema::consts; + +/// Transform the +// TODO comments +pub fn transform_to_known_schema( + record_batch: &RecordBatch, + payload_type: ArrowPayloadType, +) -> Result { + let current_schema = record_batch.schema_ref(); + let template_schema = get_template_schema(payload_type); + + let mut new_columns = Vec::with_capacity(template_schema.fields.len()); + let mut new_fields = Vec::with_capacity(template_schema.fields.len()); + + for template_field in template_schema.fields() { + match current_schema.index_of(template_field.name()) { + Ok(current_field_index) => { + // TODO handle struct here + new_columns.push(record_batch.column(current_field_index).clone()); + new_fields.push(current_schema.fields[current_field_index].clone()); + } + Err(_) => { + let new_column = if template_field.is_nullable() { + get_all_null_column(template_field.data_type(), record_batch.num_rows())? + } else { + get_all_default_value_column( + template_field.data_type(), + record_batch.num_rows(), + )? + }; + let new_field = template_field + .as_ref() + .clone() + .with_data_type(new_column.data_type().clone()); + + new_columns.push(new_column); + new_fields.push(Arc::new(new_field)); + } + } + } + + todo!() +} + +fn get_template_schema(payload_type: ArrowPayloadType) -> &'static Schema { + match payload_type { + ArrowPayloadType::Logs => &LOGS_TEMPLATE_SCHEMA, + _ => { + todo!() + } + } +} + +static LOGS_TEMPLATE_SCHEMA: LazyLock = LazyLock::new(|| { + Schema::new(vec![ + Field::new(consts::ID, DataType::UInt16, true), + Field::new( + consts::RESOURCE, + DataType::Struct(Fields::from(vec![Field::new( + consts::ID, + DataType::UInt16, + true, + )])), + false, + ), + ]) +}); + +fn get_all_null_column(data_type: &DataType, length: usize) -> Result { + // TODO once run-end encoding, we can save some memory here by allocating a single RunArray + // with one null value, and one run-end of `length`. + // https://github.com/apache/arrow-rs/issues/8016 + Ok(match data_type { + DataType::Binary => Arc::new(BinaryArray::new_null(length)), + DataType::Boolean => Arc::new(BinaryArray::new_null(length)), + DataType::FixedSizeBinary(fsl_len) => { + Arc::new(FixedSizeBinaryArray::new_null(*fsl_len, length)) + } + DataType::Float32 => Arc::new(Float32Array::new_null(length)), + DataType::Float64 => Arc::new(Float64Array::new_null(length)), + DataType::Int64 => Arc::new(Int64Array::new_null(length)), + DataType::UInt8 => Arc::new(UInt8Array::new_null(length)), + DataType::UInt16 => Arc::new(UInt16Array::new_null(length)), + DataType::UInt32 => Arc::new(UInt32Array::new_null(length)), + DataType::UInt64 => Arc::new(UInt64Array::new_null(length)), + DataType::Utf8 => Arc::new(StringArray::new_null(length)), + DataType::Timestamp(time_unit, _) => match *time_unit { + TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::new_null(length)), + _ => { + todo!() + } + }, + + DataType::Struct(fields) => get_struct_full_of_nulls(fields, true)?, + _ => { + todo!() + } + }) +} + +fn get_all_default_value_column(data_type: &DataType, length: usize) -> Result { + // TODO once run-end encoding, we can save some memory here by allocating a single RunArray + // with one default value, and one run-end of `length`. + // https://github.com/apache/arrow-rs/issues/8016 + match data_type { + DataType::Struct(fields) => get_struct_full_of_nulls(fields, false), + _ => { + todo!() + } + } +} + +fn get_struct_full_of_nulls(data_type: &Fields, struct_nullable: bool) -> Result { + todo!() +} From 59344e34c61300e366af4207c30498c550028775 Mon Sep 17 00:00:00 2001 From: albertlockett Date: Thu, 28 Aug 2025 07:49:08 -0400 Subject: [PATCH 2/7] stash --- .../otap/src/parquet_exporter/schema.rs | 237 +++++++++++++++--- 1 file changed, 204 insertions(+), 33 deletions(-) diff --git a/rust/otap-dataflow/crates/otap/src/parquet_exporter/schema.rs b/rust/otap-dataflow/crates/otap/src/parquet_exporter/schema.rs index c925c6229..37a07fac6 100644 --- a/rust/otap-dataflow/crates/otap/src/parquet_exporter/schema.rs +++ b/rust/otap-dataflow/crates/otap/src/parquet_exporter/schema.rs @@ -6,25 +6,25 @@ //! //! The main reason we can't write the record batch verbatim is because OTAP considers some columns //! optional, but parquet spec requires that each row group contain a column chunk for every field -//! in the schema. This means we can't receive two consecutive OTAP batches for some payload type -//! and write them into the same writer. +//! in the schema (and the column chunks must be in the correct order). +//! +//! This means we can't receive two consecutive OTAP batches for some payload type and write them +//! into the same writer. To handle this, we insert all null columns for missing columns (or all +//! default-value where the column is not nullable), and also arrange the columns so they're always +//! in the same order. //! //! Note that although we also switch between the Dictionary and Native encodings, we don't need to //! actually convert the existing columns to all be the same type. Parquet writer is able to accept //! logically compatible types, which includes compatibility between types like `T` (native array) //! and `Dictionary` (for any `K`). -//! -//! TODO do we also need to handle the mixed-up column order? -//! -//! To handle this, we need to add all-null columns for all the optional columns. +use std::iter::repeat_n; use std::sync::{Arc, LazyLock}; use arrow::array::{ - ArrayRef, BinaryArray, FixedSizeBinaryArray, Float32Array, Float64Array, Int64Array, - RecordBatch, StringArray, TimestampNanosecondArray, UInt8Array, UInt16Array, UInt32Array, - UInt64Array, + ArrayRef, BinaryArray, BooleanArray, FixedSizeBinaryArray, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, StructArray, TimestampNanosecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array }; +use arrow::buffer::NullBuffer; use arrow::datatypes::{DataType, Field, Fields, Schema, TimeUnit}; use otap_df_engine::error::Error; use otel_arrow_rust::proto::opentelemetry::arrow::v1::ArrowPayloadType; @@ -69,7 +69,14 @@ pub fn transform_to_known_schema( } } - todo!() + // safety: this shouldn't fail b/c we're creating a record batch where the columns all have + // the correct length and their datatypes match the schema + let new_rb = RecordBatch::try_new( + Arc::new(Schema::new(new_fields)), + new_columns, + ).expect("unexpected error creating record batch with known schema"); + + Ok(new_rb) } fn get_template_schema(payload_type: ArrowPayloadType) -> &'static Schema { @@ -81,24 +88,10 @@ fn get_template_schema(payload_type: ArrowPayloadType) -> &'static Schema { } } -static LOGS_TEMPLATE_SCHEMA: LazyLock = LazyLock::new(|| { - Schema::new(vec![ - Field::new(consts::ID, DataType::UInt16, true), - Field::new( - consts::RESOURCE, - DataType::Struct(Fields::from(vec![Field::new( - consts::ID, - DataType::UInt16, - true, - )])), - false, - ), - ]) -}); - fn get_all_null_column(data_type: &DataType, length: usize) -> Result { // TODO once run-end encoding, we can save some memory here by allocating a single RunArray - // with one null value, and one run-end of `length`. + // with one null value, and one run-end of `length`. This would allow us to allocate a few + // single length buffers instead of full-length empty buffers // https://github.com/apache/arrow-rs/issues/8016 Ok(match data_type { DataType::Binary => Arc::new(BinaryArray::new_null(length)), @@ -108,6 +101,7 @@ fn get_all_null_column(data_type: &DataType, length: usize) -> Result Arc::new(Float32Array::new_null(length)), DataType::Float64 => Arc::new(Float64Array::new_null(length)), + DataType::Int32 => Arc::new(Int32Array::new_null(length)), DataType::Int64 => Arc::new(Int64Array::new_null(length)), DataType::UInt8 => Arc::new(UInt8Array::new_null(length)), DataType::UInt16 => Arc::new(UInt16Array::new_null(length)), @@ -121,7 +115,7 @@ fn get_all_null_column(data_type: &DataType, length: usize) -> Result get_struct_full_of_nulls(fields, true)?, + DataType::Struct(fields) => get_struct_full_of_nulls_or_defaults(fields, length, true)?, _ => { todo!() } @@ -130,16 +124,193 @@ fn get_all_null_column(data_type: &DataType, length: usize) -> Result Result { // TODO once run-end encoding, we can save some memory here by allocating a single RunArray - // with one default value, and one run-end of `length`. + // with one default value, and one run-end of `length`. This would allow us to allocate a few + // single length buffers instead of full-length empty buffers // https://github.com/apache/arrow-rs/issues/8016 - match data_type { - DataType::Struct(fields) => get_struct_full_of_nulls(fields, false), + Ok(match data_type { + DataType::Binary => Arc::new(BinaryArray::from_iter_values(repeat_n(b"", length))), + DataType::Boolean => Arc::new(BooleanArray::from_iter(repeat_n(Some(false), length))), + DataType::FixedSizeBinary(fsl_len) => { + Arc::new(FixedSizeBinaryArray::try_from_iter(repeat_n(vec![0; *fsl_len as usize], length)).expect("can create FSB array from iter of correct len")) + }, + DataType::Float32 => Arc::new(Float32Array::from_iter_values(repeat_n(0.0, length))), + DataType::Float64 => Arc::new(Float64Array::from_iter_values(repeat_n(0.0, length))), + DataType::Int32 => Arc::new(Int32Array::from_iter_values(repeat_n(0, length))), + DataType::Int64 => Arc::new(Int64Array::from_iter_values(repeat_n(0, length))), + DataType::UInt8 => Arc::new(UInt8Array::from_iter_values(repeat_n(0, length))), + DataType::UInt16 => Arc::new(UInt16Array::from_iter_values(repeat_n(0, length))), + DataType::UInt32 => Arc::new(UInt32Array::from_iter_values(repeat_n(0, length))), + DataType::UInt64 => Arc::new(UInt64Array::from_iter_values(repeat_n(0, length))), + DataType::Utf8 => Arc::new(StringArray::from_iter(repeat_n(Some(""), length))), + DataType::Timestamp(time_unit, _) => match *time_unit { + TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter_values(repeat_n(0, length))), + _ => { + todo!() + } + }, + + DataType::Struct(fields) => get_struct_full_of_nulls_or_defaults(fields, length, false)?, _ => { todo!() } - } + }) } -fn get_struct_full_of_nulls(data_type: &Fields, struct_nullable: bool) -> Result { - todo!() +/// creates a a struct where all the columns are all null, or all default value if non-nullable. +/// the intention is that this will be a stand-in for the struct column of a record batch that is +/// missing some struct column. +fn get_struct_full_of_nulls_or_defaults(fields: &Fields, length: usize, struct_nullable: bool) -> Result { + let mut new_fields = Vec::with_capacity(fields.len()); + let mut new_columns = Vec::with_capacity(fields.len()); + + for field in fields { + let new_column = if field.is_nullable() { + get_all_null_column(field.data_type(), length)? + } else { + get_all_default_value_column(field.data_type(), length)? + }; + new_fields.push(field.clone()); + new_columns.push(new_column); + } + + let nulls = (!struct_nullable).then(|| NullBuffer::new_valid(length)); + let struct_array = StructArray::new(Fields::from(new_fields), new_columns, nulls); + + Ok(Arc::new(struct_array)) } + +// template schemas for various data types: +// +// note: these shouldn't be interpreted a comprehensive reference for OTAP generally. while the +// schemas below do contain every field and their logical datatypes, they lack information such +// as where dictionary encoding is used and other metadata such as which fields are optional. a +// better reference would probably be whats in the otel-arrow go code in pkg/otel + +static RESOURCE_TEMPLATE_FIELDS: LazyLock = LazyLock::new(|| { + Fields::from(vec![ + Field::new(consts::ID, DataType::UInt16, true), + Field::new(consts::SCHEMA_URL, DataType::Utf8, true), + Field::new(consts::DROPPED_ATTRIBUTES_COUNT, DataType::UInt32, true), + ]) +}); + +static SCOPE_TEMPLATE_FIELDS: LazyLock = LazyLock::new(|| { + Fields::from(vec![ + Field::new(consts::ID, DataType::UInt16, true), + Field::new(consts::NAME, DataType::Utf8, true), + Field::new(consts::VERSION, DataType::Utf8, true), + Field::new(consts::DROPPED_ATTRIBUTES_COUNT, DataType::UInt32, true), + ]) +}); + +static LOGS_TEMPLATE_SCHEMA: LazyLock = LazyLock::new(|| { + Schema::new(vec![ + Field::new(consts::ID, DataType::UInt16, true), + Field::new( + consts::RESOURCE, + DataType::Struct(RESOURCE_TEMPLATE_FIELDS.clone()), + false, + ), + Field::new( + consts::SCOPE, + DataType::Struct(SCOPE_TEMPLATE_FIELDS.clone()), + true + ) + ]) +}); + +#[cfg(test)] +mod test { + use super::*; + use arrow::array::{RecordBatch, UInt16Array}; + + #[test] + fn test_coalesces_new_columns_with_empty_columns() { + let log_attrs_record_batch = RecordBatch::try_new( + Arc::new(Schema::new(vec![ + Field::new(consts::ID, DataType::UInt16, false), + // "resource" is missing, so we should insert a new non-nullable struct + Field::new( + consts::SCOPE, + DataType::Struct(Fields::from(vec![ + Field::new(consts::ID, DataType::UInt16, true), + // scope will have some missing columns (name, version, drop_attr's_count) + // so we should see them inserted + ])), + false + ) + ])), + vec![ + Arc::new(UInt16Array::from_iter_values(vec![1, 2, 3])), + Arc::new(StructArray::new( + Fields::from(vec![ + Field::new(consts::ID, DataType::UInt16, true), + ]), + vec![ + Arc::new(UInt16Array::from_iter(vec![Some(0), None, Some(1)])) + ], + Some(NullBuffer::new_valid(3)) + )), + ] + ).unwrap(); + + let result = transform_to_known_schema(&log_attrs_record_batch, ArrowPayloadType::Logs).unwrap(); + + let expected_resource_fields = Fields::from(vec![ + Field::new(consts::ID, DataType::UInt16, true), + Field::new(consts::SCHEMA_URL, DataType::Utf8, true), + Field::new(consts::DROPPED_ATTRIBUTES_COUNT, DataType::UInt32, true), + ]); + + let expected_scope_fields = Fields::from(vec![ + Field::new(consts::ID, DataType::UInt16, true), + Field::new(consts::NAME, DataType::Utf8, true), + Field::new(consts::VERSION, DataType::Utf8, true), + Field::new(consts::DROPPED_ATTRIBUTES_COUNT, DataType::UInt32, true), + ]); + + let expected = RecordBatch::try_new( + Arc::new(Schema::new(vec![ + Field::new(consts::ID, DataType::UInt16, false), + Field::new( + consts::RESOURCE, + DataType::Struct(expected_resource_fields.clone()), + false + ), + Field::new( + consts::SCOPE, + DataType::Struct(expected_scope_fields.clone()), + false + ), + ])), + vec![ + Arc::new(UInt16Array::from_iter_values(vec![1, 2, 3])), + Arc::new( + StructArray::new( + expected_resource_fields.clone(), + vec![ + Arc::new(UInt16Array::new_null(3)), + Arc::new(StringArray::new_null(3)), + Arc::new(UInt32Array::new_null(3)), + ], + Some(NullBuffer::new_valid(3)) + ) + ), + Arc::new( + StructArray::new( + expected_scope_fields.clone(), + vec![ + Arc::new(UInt16Array::from_iter(vec![Some(0), None, Some(1)])), + Arc::new(StringArray::new_null(3)), + Arc::new(StringArray::new_null(3)), + Arc::new(UInt32Array::new_null(3)), + ], + Some(NullBuffer::new_valid(3)) + ) + ) + ] + ).unwrap(); + + assert_eq!(result, expected) + } +} \ No newline at end of file From 1d90be69b4e85f7214bde281db3b5ca675ffadfc Mon Sep 17 00:00:00 2001 From: albertlockett Date: Thu, 28 Aug 2025 08:35:18 -0400 Subject: [PATCH 3/7] cleaned up the implementation --- rust/otap-dataflow/Cargo.toml | 1 + rust/otap-dataflow/crates/otap/Cargo.toml | 1 + .../otap/src/parquet_exporter/schema.rs | 124 +++++++++++++----- rust/otel-arrow-rust/src/schema.rs | 14 +- 4 files changed, 109 insertions(+), 31 deletions(-) diff --git a/rust/otap-dataflow/Cargo.toml b/rust/otap-dataflow/Cargo.toml index e208c8d75..f0e6ec8d6 100644 --- a/rust/otap-dataflow/Cargo.toml +++ b/rust/otap-dataflow/Cargo.toml @@ -63,6 +63,7 @@ log = "0.4" criterion = "0.7.0" miette = { version="7.6.0", features = ["fancy"] } linkme = "0.3.33" +pretty_assertions = "1.4.1" proc-macro2 = "1.0" quote = "1.0" syn = { version = "2.0", features = ["full"] } diff --git a/rust/otap-dataflow/crates/otap/Cargo.toml b/rust/otap-dataflow/crates/otap/Cargo.toml index c36c9edba..aa64d86bb 100644 --- a/rust/otap-dataflow/crates/otap/Cargo.toml +++ b/rust/otap-dataflow/crates/otap/Cargo.toml @@ -61,5 +61,6 @@ weaver_common.workspace = true [dev-dependencies] portpicker = "0.1.1" +pretty_assertions = "1.4.1" tempfile.workspace = true url.workspace = true diff --git a/rust/otap-dataflow/crates/otap/src/parquet_exporter/schema.rs b/rust/otap-dataflow/crates/otap/src/parquet_exporter/schema.rs index 37a07fac6..37f3c34d8 100644 --- a/rust/otap-dataflow/crates/otap/src/parquet_exporter/schema.rs +++ b/rust/otap-dataflow/crates/otap/src/parquet_exporter/schema.rs @@ -22,13 +22,13 @@ use std::iter::repeat_n; use std::sync::{Arc, LazyLock}; use arrow::array::{ - ArrayRef, BinaryArray, BooleanArray, FixedSizeBinaryArray, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, StructArray, TimestampNanosecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array + Array, ArrayRef, BinaryArray, BooleanArray, FixedSizeBinaryArray, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, StructArray, TimestampNanosecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array }; use arrow::buffer::NullBuffer; use arrow::datatypes::{DataType, Field, Fields, Schema, TimeUnit}; use otap_df_engine::error::Error; use otel_arrow_rust::proto::opentelemetry::arrow::v1::ArrowPayloadType; -use otel_arrow_rust::schema::consts; +use otel_arrow_rust::schema::{consts, FieldExt}; /// Transform the // TODO comments @@ -39,54 +39,108 @@ pub fn transform_to_known_schema( let current_schema = record_batch.schema_ref(); let template_schema = get_template_schema(payload_type); - let mut new_columns = Vec::with_capacity(template_schema.fields.len()); - let mut new_fields = Vec::with_capacity(template_schema.fields.len()); + let (new_columns, new_fields) = transform_to_known_schema_internal( + record_batch.num_rows(), + record_batch.columns(), + current_schema.fields(), + template_schema.fields() + )?; - for template_field in template_schema.fields() { - match current_schema.index_of(template_field.name()) { - Ok(current_field_index) => { - // TODO handle struct here - new_columns.push(record_batch.column(current_field_index).clone()); - new_fields.push(current_schema.fields[current_field_index].clone()); + // safety: this shouldn't fail b/c we're creating a record batch where the columns all have + // the correct length and their datatypes match the schema + let new_rb = RecordBatch::try_new( + Arc::new(Schema::new(new_fields)), + new_columns, + ).expect("unexpected error creating record batch with known schema"); + + Ok(new_rb) +} + +fn transform_struct_to_known_schema( + num_rows: usize, + current_array: &StructArray, + template_fields: &Fields +) -> Result { + let (new_columns, new_fields) = transform_to_known_schema_internal( + num_rows, + current_array.columns(), + current_array.fields(), + template_fields + )?; + + Ok(StructArray::new(new_fields, new_columns, current_array.nulls().cloned())) +} + + +fn transform_to_known_schema_internal( + num_rows: usize, + current_columns: &[ArrayRef], + current_fields: &Fields, + template_fields: &Fields +) -> Result<(Vec, Fields), Error> { + let mut new_columns = Vec::with_capacity(template_fields.len()); + let mut new_fields = Vec::with_capacity(template_fields.len()); + + for template_field in template_fields { + // TODO -- the last 3 blocks of each of the some/none branches are the same here. We might + // but first need to figure out if we need to preserve the metadata? + match current_fields.find(template_field.name()) { + Some((current_field_index, current_field)) => { + // column exists, reuse the existing column.. + // let current_field = ¤t_schema.fields[current_field_index]; + let current_column = ¤t_columns[current_field_index]; + let new_column = if let DataType::Struct(_) = current_field.data_type() { + // handle struct column + let new_struct_arr = transform_struct_to_known_schema( + num_rows, + // safety: we've just checked the datatype + current_column.as_any().downcast_ref().expect("can downcast to struct"), + + // TODO -- need to return an error here if this is None b/c it means that + // the record batch we received had a struct column where it shouldn't + template_field.as_struct_fields().unwrap(), + )?; + + Arc::new(new_struct_arr) + } else { + // otherwise just keep the existing column + current_column.clone() + }; + + // TODO if the datatypes are the same here, there's no need to create a new Arc + let new_field = current_field + .as_ref() + .clone() + .with_data_type(new_column.data_type().clone()); + new_columns.push(new_column); + new_fields.push(Arc::new(new_field)); } - Err(_) => { + + None => { + // column doesn't exist, add a new "empty" column.. let new_column = if template_field.is_nullable() { - get_all_null_column(template_field.data_type(), record_batch.num_rows())? + get_all_null_column(template_field.data_type(), num_rows)? } else { get_all_default_value_column( template_field.data_type(), - record_batch.num_rows(), + num_rows )? }; + + // TODO if the datatypes are the same here, there's no need to create a new Arc let new_field = template_field .as_ref() .clone() .with_data_type(new_column.data_type().clone()); - new_columns.push(new_column); new_fields.push(Arc::new(new_field)); } } } - // safety: this shouldn't fail b/c we're creating a record batch where the columns all have - // the correct length and their datatypes match the schema - let new_rb = RecordBatch::try_new( - Arc::new(Schema::new(new_fields)), - new_columns, - ).expect("unexpected error creating record batch with known schema"); - - Ok(new_rb) + Ok((new_columns, Fields::from(new_fields))) } -fn get_template_schema(payload_type: ArrowPayloadType) -> &'static Schema { - match payload_type { - ArrowPayloadType::Logs => &LOGS_TEMPLATE_SCHEMA, - _ => { - todo!() - } - } -} fn get_all_null_column(data_type: &DataType, length: usize) -> Result { // TODO once run-end encoding, we can save some memory here by allocating a single RunArray @@ -179,6 +233,15 @@ fn get_struct_full_of_nulls_or_defaults(fields: &Fields, length: usize, struct_n Ok(Arc::new(struct_array)) } +fn get_template_schema(payload_type: ArrowPayloadType) -> &'static Schema { + match payload_type { + ArrowPayloadType::Logs => &LOGS_TEMPLATE_SCHEMA, + _ => { + todo!() + } + } +} + // template schemas for various data types: // // note: these shouldn't be interpreted a comprehensive reference for OTAP generally. while the @@ -223,6 +286,7 @@ static LOGS_TEMPLATE_SCHEMA: LazyLock = LazyLock::new(|| { mod test { use super::*; use arrow::array::{RecordBatch, UInt16Array}; + use pretty_assertions::assert_eq; #[test] fn test_coalesces_new_columns_with_empty_columns() { diff --git a/rust/otel-arrow-rust/src/schema.rs b/rust/otel-arrow-rust/src/schema.rs index ac0f49ff2..12ce81baa 100644 --- a/rust/otel-arrow-rust/src/schema.rs +++ b/rust/otel-arrow-rust/src/schema.rs @@ -7,7 +7,7 @@ #![allow(missing_docs)] use arrow::array::{LargeListArray, RecordBatch}; -use arrow::datatypes::{Field, Schema}; +use arrow::datatypes::{DataType, Field, Fields, Schema}; use std::sync::Arc; pub mod consts; @@ -165,6 +165,10 @@ pub trait FieldExt { /// Sets the encoding column metadata key to "plain". fn with_plain_encoding(self) -> Self; + + /// tries to convert the `Field` into the inner fields. Returns `None` if the field's data_type + /// is not `Struct` + fn as_struct_fields(&self) -> Option<&Fields>; } impl FieldExt for Field { @@ -178,4 +182,12 @@ impl FieldExt for Field { fn with_plain_encoding(self) -> Self { self.with_encoding(consts::metadata::encodings::PLAIN) } + + fn as_struct_fields(&self) -> Option<&Fields> { + if let DataType::Struct(fields) = self.data_type() { + Some(fields) + } else { + None + } + } } From d945e2ec16b9a9c0a518b5b844bd3e38e8f1024e Mon Sep 17 00:00:00 2001 From: albertlockett Date: Thu, 28 Aug 2025 08:36:00 -0400 Subject: [PATCH 4/7] I ran cargo format --- .../otap/src/parquet_exporter/schema.rs | 159 +++++++++--------- 1 file changed, 83 insertions(+), 76 deletions(-) diff --git a/rust/otap-dataflow/crates/otap/src/parquet_exporter/schema.rs b/rust/otap-dataflow/crates/otap/src/parquet_exporter/schema.rs index 37f3c34d8..833b9e108 100644 --- a/rust/otap-dataflow/crates/otap/src/parquet_exporter/schema.rs +++ b/rust/otap-dataflow/crates/otap/src/parquet_exporter/schema.rs @@ -6,10 +6,10 @@ //! //! The main reason we can't write the record batch verbatim is because OTAP considers some columns //! optional, but parquet spec requires that each row group contain a column chunk for every field -//! in the schema (and the column chunks must be in the correct order). -//! -//! This means we can't receive two consecutive OTAP batches for some payload type and write them -//! into the same writer. To handle this, we insert all null columns for missing columns (or all +//! in the schema (and the column chunks must be in the correct order). +//! +//! This means we can't receive two consecutive OTAP batches for some payload type and write them +//! into the same writer. To handle this, we insert all null columns for missing columns (or all //! default-value where the column is not nullable), and also arrange the columns so they're always //! in the same order. //! @@ -22,13 +22,15 @@ use std::iter::repeat_n; use std::sync::{Arc, LazyLock}; use arrow::array::{ - Array, ArrayRef, BinaryArray, BooleanArray, FixedSizeBinaryArray, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, StructArray, TimestampNanosecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array + Array, ArrayRef, BinaryArray, BooleanArray, FixedSizeBinaryArray, Float32Array, Float64Array, + Int32Array, Int64Array, RecordBatch, StringArray, StructArray, TimestampNanosecondArray, + UInt8Array, UInt16Array, UInt32Array, UInt64Array, }; use arrow::buffer::NullBuffer; use arrow::datatypes::{DataType, Field, Fields, Schema, TimeUnit}; use otap_df_engine::error::Error; use otel_arrow_rust::proto::opentelemetry::arrow::v1::ArrowPayloadType; -use otel_arrow_rust::schema::{consts, FieldExt}; +use otel_arrow_rust::schema::{FieldExt, consts}; /// Transform the // TODO comments @@ -42,16 +44,14 @@ pub fn transform_to_known_schema( let (new_columns, new_fields) = transform_to_known_schema_internal( record_batch.num_rows(), record_batch.columns(), - current_schema.fields(), - template_schema.fields() + current_schema.fields(), + template_schema.fields(), )?; // safety: this shouldn't fail b/c we're creating a record batch where the columns all have // the correct length and their datatypes match the schema - let new_rb = RecordBatch::try_new( - Arc::new(Schema::new(new_fields)), - new_columns, - ).expect("unexpected error creating record batch with known schema"); + let new_rb = RecordBatch::try_new(Arc::new(Schema::new(new_fields)), new_columns) + .expect("unexpected error creating record batch with known schema"); Ok(new_rb) } @@ -59,24 +59,27 @@ pub fn transform_to_known_schema( fn transform_struct_to_known_schema( num_rows: usize, current_array: &StructArray, - template_fields: &Fields + template_fields: &Fields, ) -> Result { let (new_columns, new_fields) = transform_to_known_schema_internal( - num_rows, + num_rows, current_array.columns(), - current_array.fields(), - template_fields + current_array.fields(), + template_fields, )?; - Ok(StructArray::new(new_fields, new_columns, current_array.nulls().cloned())) + Ok(StructArray::new( + new_fields, + new_columns, + current_array.nulls().cloned(), + )) } - fn transform_to_known_schema_internal( num_rows: usize, current_columns: &[ArrayRef], current_fields: &Fields, - template_fields: &Fields + template_fields: &Fields, ) -> Result<(Vec, Fields), Error> { let mut new_columns = Vec::with_capacity(template_fields.len()); let mut new_fields = Vec::with_capacity(template_fields.len()); @@ -94,8 +97,10 @@ fn transform_to_known_schema_internal( let new_struct_arr = transform_struct_to_known_schema( num_rows, // safety: we've just checked the datatype - current_column.as_any().downcast_ref().expect("can downcast to struct"), - + current_column + .as_any() + .downcast_ref() + .expect("can downcast to struct"), // TODO -- need to return an error here if this is None b/c it means that // the record batch we received had a struct column where it shouldn't template_field.as_struct_fields().unwrap(), @@ -116,17 +121,14 @@ fn transform_to_known_schema_internal( new_fields.push(Arc::new(new_field)); } - None => { + None => { // column doesn't exist, add a new "empty" column.. let new_column = if template_field.is_nullable() { get_all_null_column(template_field.data_type(), num_rows)? } else { - get_all_default_value_column( - template_field.data_type(), - num_rows - )? + get_all_default_value_column(template_field.data_type(), num_rows)? }; - + // TODO if the datatypes are the same here, there's no need to create a new Arc let new_field = template_field .as_ref() @@ -141,7 +143,6 @@ fn transform_to_known_schema_internal( Ok((new_columns, Fields::from(new_fields))) } - fn get_all_null_column(data_type: &DataType, length: usize) -> Result { // TODO once run-end encoding, we can save some memory here by allocating a single RunArray // with one null value, and one run-end of `length`. This would allow us to allocate a few @@ -184,9 +185,10 @@ fn get_all_default_value_column(data_type: &DataType, length: usize) -> Result Arc::new(BinaryArray::from_iter_values(repeat_n(b"", length))), DataType::Boolean => Arc::new(BooleanArray::from_iter(repeat_n(Some(false), length))), - DataType::FixedSizeBinary(fsl_len) => { - Arc::new(FixedSizeBinaryArray::try_from_iter(repeat_n(vec![0; *fsl_len as usize], length)).expect("can create FSB array from iter of correct len")) - }, + DataType::FixedSizeBinary(fsl_len) => Arc::new( + FixedSizeBinaryArray::try_from_iter(repeat_n(vec![0; *fsl_len as usize], length)) + .expect("can create FSB array from iter of correct len"), + ), DataType::Float32 => Arc::new(Float32Array::from_iter_values(repeat_n(0.0, length))), DataType::Float64 => Arc::new(Float64Array::from_iter_values(repeat_n(0.0, length))), DataType::Int32 => Arc::new(Int32Array::from_iter_values(repeat_n(0, length))), @@ -197,7 +199,9 @@ fn get_all_default_value_column(data_type: &DataType, length: usize) -> Result Arc::new(UInt64Array::from_iter_values(repeat_n(0, length))), DataType::Utf8 => Arc::new(StringArray::from_iter(repeat_n(Some(""), length))), DataType::Timestamp(time_unit, _) => match *time_unit { - TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter_values(repeat_n(0, length))), + TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter_values(repeat_n( + 0, length, + ))), _ => { todo!() } @@ -213,7 +217,11 @@ fn get_all_default_value_column(data_type: &DataType, length: usize) -> Result Result { +fn get_struct_full_of_nulls_or_defaults( + fields: &Fields, + length: usize, + struct_nullable: bool, +) -> Result { let mut new_fields = Vec::with_capacity(fields.len()); let mut new_columns = Vec::with_capacity(fields.len()); @@ -229,7 +237,7 @@ fn get_struct_full_of_nulls_or_defaults(fields: &Fields, length: usize, struct_n let nulls = (!struct_nullable).then(|| NullBuffer::new_valid(length)); let struct_array = StructArray::new(Fields::from(new_fields), new_columns, nulls); - + Ok(Arc::new(struct_array)) } @@ -277,8 +285,8 @@ static LOGS_TEMPLATE_SCHEMA: LazyLock = LazyLock::new(|| { Field::new( consts::SCOPE, DataType::Struct(SCOPE_TEMPLATE_FIELDS.clone()), - true - ) + true, + ), ]) }); @@ -301,24 +309,26 @@ mod test { // scope will have some missing columns (name, version, drop_attr's_count) // so we should see them inserted ])), - false - ) + false, + ), ])), vec![ Arc::new(UInt16Array::from_iter_values(vec![1, 2, 3])), Arc::new(StructArray::new( - Fields::from(vec![ - Field::new(consts::ID, DataType::UInt16, true), - ]), - vec![ - Arc::new(UInt16Array::from_iter(vec![Some(0), None, Some(1)])) - ], - Some(NullBuffer::new_valid(3)) + Fields::from(vec![Field::new(consts::ID, DataType::UInt16, true)]), + vec![Arc::new(UInt16Array::from_iter(vec![ + Some(0), + None, + Some(1), + ]))], + Some(NullBuffer::new_valid(3)), )), - ] - ).unwrap(); + ], + ) + .unwrap(); - let result = transform_to_known_schema(&log_attrs_record_batch, ArrowPayloadType::Logs).unwrap(); + let result = + transform_to_known_schema(&log_attrs_record_batch, ArrowPayloadType::Logs).unwrap(); let expected_resource_fields = Fields::from(vec![ Field::new(consts::ID, DataType::UInt16, true), @@ -339,42 +349,39 @@ mod test { Field::new( consts::RESOURCE, DataType::Struct(expected_resource_fields.clone()), - false + false, ), Field::new( consts::SCOPE, DataType::Struct(expected_scope_fields.clone()), - false + false, ), ])), vec![ Arc::new(UInt16Array::from_iter_values(vec![1, 2, 3])), - Arc::new( - StructArray::new( - expected_resource_fields.clone(), - vec![ - Arc::new(UInt16Array::new_null(3)), - Arc::new(StringArray::new_null(3)), - Arc::new(UInt32Array::new_null(3)), - ], - Some(NullBuffer::new_valid(3)) - ) - ), - Arc::new( - StructArray::new( - expected_scope_fields.clone(), - vec![ - Arc::new(UInt16Array::from_iter(vec![Some(0), None, Some(1)])), - Arc::new(StringArray::new_null(3)), - Arc::new(StringArray::new_null(3)), - Arc::new(UInt32Array::new_null(3)), - ], - Some(NullBuffer::new_valid(3)) - ) - ) - ] - ).unwrap(); + Arc::new(StructArray::new( + expected_resource_fields.clone(), + vec![ + Arc::new(UInt16Array::new_null(3)), + Arc::new(StringArray::new_null(3)), + Arc::new(UInt32Array::new_null(3)), + ], + Some(NullBuffer::new_valid(3)), + )), + Arc::new(StructArray::new( + expected_scope_fields.clone(), + vec![ + Arc::new(UInt16Array::from_iter(vec![Some(0), None, Some(1)])), + Arc::new(StringArray::new_null(3)), + Arc::new(StringArray::new_null(3)), + Arc::new(UInt32Array::new_null(3)), + ], + Some(NullBuffer::new_valid(3)), + )), + ], + ) + .unwrap(); assert_eq!(result, expected) } -} \ No newline at end of file +} From 88027bc1eccac406b1978cece08cc6b3e53cb4bc Mon Sep 17 00:00:00 2001 From: albertlockett Date: Thu, 28 Aug 2025 09:40:29 -0400 Subject: [PATCH 5/7] added template structs for every payload type --- .../otap/src/parquet_exporter/schema.rs | 383 +++++++++++++++++- 1 file changed, 380 insertions(+), 3 deletions(-) diff --git a/rust/otap-dataflow/crates/otap/src/parquet_exporter/schema.rs b/rust/otap-dataflow/crates/otap/src/parquet_exporter/schema.rs index 833b9e108..65c295f34 100644 --- a/rust/otap-dataflow/crates/otap/src/parquet_exporter/schema.rs +++ b/rust/otap-dataflow/crates/otap/src/parquet_exporter/schema.rs @@ -28,8 +28,11 @@ use arrow::array::{ }; use arrow::buffer::NullBuffer; use arrow::datatypes::{DataType, Field, Fields, Schema, TimeUnit}; +use futures::future::Lazy; use otap_df_engine::error::Error; +use otap_df_otlp::proto::opentelemetry::metrics::v1::metric::Data; use otel_arrow_rust::proto::opentelemetry::arrow::v1::ArrowPayloadType; +use otel_arrow_rust::schema::consts::ATTRIBUTE_SER; use otel_arrow_rust::schema::{FieldExt, consts}; /// Transform the @@ -150,7 +153,7 @@ fn get_all_null_column(data_type: &DataType, length: usize) -> Result Arc::new(BinaryArray::new_null(length)), - DataType::Boolean => Arc::new(BinaryArray::new_null(length)), + DataType::Boolean => Arc::new(BooleanArray::new_null(length)), DataType::FixedSizeBinary(fsl_len) => { Arc::new(FixedSizeBinaryArray::new_null(*fsl_len, length)) } @@ -244,7 +247,35 @@ fn get_struct_full_of_nulls_or_defaults( fn get_template_schema(payload_type: ArrowPayloadType) -> &'static Schema { match payload_type { ArrowPayloadType::Logs => &LOGS_TEMPLATE_SCHEMA, - _ => { + ArrowPayloadType::UnivariateMetrics | ArrowPayloadType::MultivariateMetrics => { + &METRICS_TEMPLATE_SCHEMA + } + ArrowPayloadType::SummaryDataPoints => &SUMMARY_DP_TEMPLATE_SCHEMA, + ArrowPayloadType::NumberDataPoints => &NUMBERS_DP_TEMPLATE_SCHEMA, + ArrowPayloadType::HistogramDataPoints => &HISTOGRAM_DP_TEMPLATE_SCHEMA, + ArrowPayloadType::ExpHistogramDataPoints => &EXP_HISTOGRAM_DP_TEMPLATE_SCHEMA, + ArrowPayloadType::NumberDpExemplars + | ArrowPayloadType::HistogramDpExemplars + | ArrowPayloadType::ExpHistogramDpExemplars => &EXEMPLAR_TEMPLATE_SCHEMA, + ArrowPayloadType::Spans => &SPANS_TEMPLATE_SCHEMA, + ArrowPayloadType::SpanLinks => &SPAN_LINKS_TEMPLATE_SCHEMA, + ArrowPayloadType::SpanEvents => &SPAN_EVENTS_TEMPLATE_SCHEMA, + ArrowPayloadType::ResourceAttrs + | ArrowPayloadType::ScopeAttrs + | ArrowPayloadType::MetricAttrs + | ArrowPayloadType::SpanAttrs + | ArrowPayloadType::LogAttrs => &ATTRS_16_TEMPLATE_SCHEMA, + ArrowPayloadType::SpanLinkAttrs + | ArrowPayloadType::SpanEventAttrs + | ArrowPayloadType::NumberDpAttrs + | ArrowPayloadType::SummaryDpAttrs + | ArrowPayloadType::HistogramDpAttrs + | ArrowPayloadType::ExpHistogramDpAttrs + | ArrowPayloadType::HistogramDpExemplarAttrs + | ArrowPayloadType::NumberDpExemplarAttrs + | ArrowPayloadType::ExpHistogramDpExemplarAttrs => &ATTRS_32_TEMPLATE_SCHEMA, + ArrowPayloadType::Unknown => { + // TODO need to return error here? todo!() } } @@ -287,13 +318,311 @@ static LOGS_TEMPLATE_SCHEMA: LazyLock = LazyLock::new(|| { DataType::Struct(SCOPE_TEMPLATE_FIELDS.clone()), true, ), + Field::new(consts::SCHEMA_URL, DataType::Utf8, false), + Field::new( + consts::TIME_UNIX_NANO, + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new( + consts::OBSERVED_TIME_UNIX_NANO, + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new(consts::TRACE_ID, DataType::FixedSizeBinary(16), true), + Field::new(consts::SPAN_ID, DataType::FixedSizeBinary(16), true), + Field::new(consts::SEVERITY_NUMBER, DataType::Int32, true), + Field::new(consts::SEVERITY_TEXT, DataType::Utf8, true), + Field::new( + consts::BODY, + DataType::Struct(Fields::from(vec![ + Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, true), + Field::new(consts::ATTRIBUTE_STR, DataType::Utf8, true), + Field::new(consts::ATTRIBUTE_INT, DataType::Int64, true), + Field::new(consts::ATTRIBUTE_DOUBLE, DataType::Float64, true), + Field::new(consts::ATTRIBUTE_BOOL, DataType::Boolean, true), + Field::new(consts::ATTRIBUTE_BYTES, DataType::Binary, true), + Field::new(consts::ATTRIBUTE_SER, DataType::Binary, true), + ])), + true, + ), + Field::new(consts::DROPPED_ATTRIBUTES_COUNT, DataType::UInt32, false), + Field::new(consts::FLAGS, DataType::UInt32, false), + ]) +}); + +static METRICS_TEMPLATE_SCHEMA: LazyLock = LazyLock::new(|| { + Schema::new(vec![ + Field::new(consts::ID, DataType::UInt16, true), + Field::new( + consts::RESOURCE, + DataType::Struct(RESOURCE_TEMPLATE_FIELDS.clone()), + false, + ), + Field::new( + consts::SCOPE, + DataType::Struct(SCOPE_TEMPLATE_FIELDS.clone()), + false, + ), + Field::new(consts::SCHEMA_URL, DataType::Utf8, false), + Field::new(consts::METRIC_TYPE, DataType::UInt8, false), + Field::new(consts::NAME, DataType::Utf8, false), + Field::new(consts::DESCRIPTION, DataType::Utf8, false), + Field::new(consts::UNIT, DataType::Utf8, false), + Field::new(consts::AGGREGATION_TEMPORALITY, DataType::Int32, true), + Field::new(consts::DROPPED_ATTRIBUTES_COUNT, DataType::UInt32, true), + ]) +}); + +static NUMBERS_DP_TEMPLATE_SCHEMA: LazyLock = LazyLock::new(|| { + Schema::new(vec![ + Field::new(consts::ID, DataType::UInt32, false), + Field::new(consts::PARENT_ID, DataType::UInt16, false), + Field::new( + consts::START_TIME_UNIX_NANO, + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + ), + Field::new( + consts::TIME_UNIX_NANO, + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + ), + Field::new(consts::INT_VALUE, DataType::Int64, true), + Field::new(consts::DOUBLE_VALUE, DataType::Float64, true), + Field::new(consts::FLAGS, DataType::UInt32, false), + ]) +}); + +static SUMMARY_DP_TEMPLATE_SCHEMA: LazyLock = LazyLock::new(|| { + Schema::new(vec![ + Field::new(consts::ID, DataType::UInt32, false), + Field::new(consts::PARENT_ID, DataType::UInt16, false), + Field::new( + consts::START_TIME_UNIX_NANO, + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new( + consts::TIME_UNIX_NANO, + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new(consts::SUMMARY_COUNT, DataType::UInt64, false), + Field::new(consts::SUMMARY_SUM, DataType::Float64, false), + Field::new( + consts::SUMMARY_QUANTILE_VALUES, + DataType::List(Arc::new(Field::new( + "item", + DataType::Struct(Fields::from(vec![ + Field::new(consts::SUMMARY_QUANTILE, DataType::Float64, false), + Field::new(consts::SUMMARY_VALUE, DataType::Float64, false), + ])), + true, + ))), + false, + ), + Field::new(consts::FLAGS, DataType::UInt32, false), + ]) +}); + +static HISTOGRAM_DP_TEMPLATE_SCHEMA: LazyLock = LazyLock::new(|| { + Schema::new(vec![ + Field::new(consts::ID, DataType::UInt32, false), + Field::new(consts::PARENT_ID, DataType::UInt16, false), + Field::new( + consts::START_TIME_UNIX_NANO, + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new( + consts::TIME_UNIX_NANO, + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new(consts::HISTOGRAM_COUNT, DataType::UInt64, false), + Field::new(consts::HISTOGRAM_SUM, DataType::Float64, true), + Field::new( + consts::HISTOGRAM_BUCKET_COUNTS, + DataType::List(Arc::new(Field::new("item", DataType::UInt64, false))), + false, + ), + Field::new( + consts::HISTOGRAM_EXPLICIT_BOUNDS, + DataType::List(Arc::new(Field::new("item", DataType::Float64, true))), + true, + ), + Field::new(consts::FLAGS, DataType::UInt32, false), + Field::new(consts::HISTOGRAM_MIN, DataType::Float64, true), + Field::new(consts::HISTOGRAM_MAX, DataType::Float64, true), + ]) +}); + +static EXP_HISTOGRAM_DP_TEMPLATE_SCHEMA: LazyLock = LazyLock::new(|| { + Schema::new(vec![ + Field::new(consts::ID, DataType::UInt32, false), + Field::new(consts::PARENT_ID, DataType::UInt16, false), + Field::new( + consts::START_TIME_UNIX_NANO, + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new( + consts::TIME_UNIX_NANO, + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new(consts::HISTOGRAM_COUNT, DataType::UInt64, false), + Field::new(consts::HISTOGRAM_SUM, DataType::Float64, true), + Field::new(consts::EXP_HISTOGRAM_SCALE, DataType::Int32, false), + Field::new(consts::EXP_HISTOGRAM_ZERO_COUNT, DataType::UInt64, false), + Field::new( + consts::EXP_HISTOGRAM_POSITIVE, + DataType::Struct(Fields::from(vec![ + Field::new(consts::EXP_HISTOGRAM_OFFSET, DataType::Int32, false), + Field::new( + consts::EXP_HISTOGRAM_BUCKET_COUNTS, + DataType::List(Arc::new(Field::new("item", DataType::UInt64, false))), + false, + ), + ])), + false, + ), + Field::new( + consts::EXP_HISTOGRAM_NEGATIVE, + DataType::Struct(Fields::from(vec![ + Field::new(consts::EXP_HISTOGRAM_OFFSET, DataType::Int32, false), + Field::new( + consts::EXP_HISTOGRAM_BUCKET_COUNTS, + DataType::List(Arc::new(Field::new("item", DataType::UInt64, false))), + false, + ), + ])), + false, + ), + Field::new(consts::FLAGS, DataType::UInt32, false), + Field::new(consts::HISTOGRAM_MIN, DataType::Float64, true), + Field::new(consts::HISTOGRAM_MAX, DataType::Float64, true), + ]) +}); + +static EXEMPLAR_TEMPLATE_SCHEMA: LazyLock = LazyLock::new(|| { + Schema::new(vec![ + Field::new(consts::ID, DataType::UInt32, true), + Field::new(consts::PARENT_ID, DataType::UInt32, false), + Field::new( + consts::TIME_UNIX_NANO, + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new(consts::INT_VALUE, DataType::Int64, false), + Field::new(consts::DOUBLE_VALUE, DataType::Int64, false), + Field::new(consts::SPAN_ID, DataType::FixedSizeBinary(8), true), + Field::new(consts::TRACE_ID, DataType::FixedSizeBinary(16), true), + ]) +}); + +static SPANS_TEMPLATE_SCHEMA: LazyLock = LazyLock::new(|| { + Schema::new(vec![ + Field::new(consts::ID, DataType::UInt16, true), + Field::new( + consts::RESOURCE, + DataType::Struct(RESOURCE_TEMPLATE_FIELDS.clone()), + true, + ), + Field::new( + consts::SCOPE, + DataType::Struct(SCOPE_TEMPLATE_FIELDS.clone()), + true, + ), + Field::new(consts::SCHEMA_URL, DataType::Utf8, false), + Field::new( + consts::START_TIME_UNIX_NANO, + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new( + consts::DURATION_TIME_UNIX_NANO, + DataType::Duration(TimeUnit::Nanosecond), + false, + ), + Field::new(consts::TRACE_ID, DataType::FixedSizeBinary(16), false), + Field::new(consts::SPAN_ID, DataType::FixedSizeBinary(8), false), + Field::new(consts::TRACE_STATE, DataType::Utf8, true), + Field::new(consts::PARENT_SPAN_ID, DataType::FixedSizeBinary(8), true), + Field::new(consts::NAME, DataType::Utf8, false), + Field::new(consts::KIND, DataType::Int32, true), + Field::new(consts::DROPPED_ATTRIBUTES_COUNT, DataType::UInt32, true), + Field::new(consts::DROPPED_EVENTS_COUNT, DataType::UInt32, true), + Field::new(consts::DROPPED_LINKS_COUNT, DataType::UInt32, true), + Field::new( + consts::STATUS, + DataType::Struct(Fields::from(vec![ + Field::new(consts::STATUS_CODE, DataType::Int32, true), + Field::new(consts::STATUS_MESSAGE, DataType::Utf8, true), + ])), + true, + ), + ]) +}); + +static SPAN_EVENTS_TEMPLATE_SCHEMA: LazyLock = LazyLock::new(|| { + Schema::new(vec![ + Field::new(consts::ID, DataType::UInt32, true), + Field::new(consts::PARENT_ID, DataType::UInt16, false), + Field::new( + consts::TIME_UNIX_NANO, + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + ), + Field::new(consts::NAME, DataType::Utf8, false), + Field::new(consts::DROPPED_ATTRIBUTES_COUNT, DataType::UInt32, true), + ]) +}); + +static SPAN_LINKS_TEMPLATE_SCHEMA: LazyLock = LazyLock::new(|| { + Schema::new(vec![ + Field::new(consts::ID, DataType::UInt32, true), + Field::new(consts::PARENT_ID, DataType::UInt16, false), + Field::new(consts::TRACE_ID, DataType::FixedSizeBinary(16), true), + Field::new(consts::SPAN_ID, DataType::FixedSizeBinary(8), true), + Field::new(consts::TRACE_STATE, DataType::Utf8, true), + Field::new(consts::DROPPED_ATTRIBUTES_COUNT, DataType::UInt32, true), + ]) +}); + +static ATTRS_16_TEMPLATE_SCHEMA: LazyLock = LazyLock::new(|| { + Schema::new(vec![ + Field::new(consts::PARENT_ID, DataType::UInt16, false), + Field::new(consts::ATTRIBUTE_KEY, DataType::Utf8, false), + Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false), + Field::new(consts::ATTRIBUTE_STR, DataType::Utf8, true), + Field::new(consts::ATTRIBUTE_INT, DataType::Int64, true), + Field::new(consts::ATTRIBUTE_DOUBLE, DataType::Float64, true), + Field::new(consts::ATTRIBUTE_BOOL, DataType::Boolean, true), + Field::new(consts::ATTRIBUTE_BYTES, DataType::Binary, true), + Field::new(consts::ATTRIBUTE_SER, DataType::Binary, true), + ]) +}); + +static ATTRS_32_TEMPLATE_SCHEMA: LazyLock = LazyLock::new(|| { + Schema::new(vec![ + Field::new(consts::PARENT_ID, DataType::UInt32, false), + Field::new(consts::ATTRIBUTE_KEY, DataType::Utf8, false), + Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, false), + Field::new(consts::ATTRIBUTE_STR, DataType::Utf8, true), + Field::new(consts::ATTRIBUTE_INT, DataType::Int64, true), + Field::new(consts::ATTRIBUTE_DOUBLE, DataType::Float64, true), + Field::new(consts::ATTRIBUTE_BOOL, DataType::Boolean, true), + Field::new(consts::ATTRIBUTE_BYTES, DataType::Binary, true), + Field::new(consts::ATTRIBUTE_SER, DataType::Binary, true), ]) }); #[cfg(test)] mod test { use super::*; - use arrow::array::{RecordBatch, UInt16Array}; + use arrow::array::{RecordBatch, RecordBatchOptions, UInt16Array}; use pretty_assertions::assert_eq; #[test] @@ -311,6 +640,11 @@ mod test { ])), false, ), + // TODO fill in other columns: + // 1 - default null/values on FSL + // 2 - nullable struct (body?) + // 3 - keeps dicts + // ... ])), vec![ Arc::new(UInt16Array::from_iter_values(vec![1, 2, 3])), @@ -384,4 +718,47 @@ mod test { assert_eq!(result, expected) } + + #[test] + fn test_generate_all_null_record_batch() { + // this is just a smoke test to ensure that all of our null/empty column generating code + // isn't missing any types that are actually used in one of the template schemas + for payload_type in [ + ArrowPayloadType::Logs, + ArrowPayloadType::UnivariateMetrics, + ArrowPayloadType::MultivariateMetrics, + ArrowPayloadType::SummaryDataPoints, + ArrowPayloadType::NumberDataPoints, + ArrowPayloadType::HistogramDataPoints, + ArrowPayloadType::ExpHistogramDataPoints, + ArrowPayloadType::NumberDpExemplars, + ArrowPayloadType::HistogramDpExemplars, + ArrowPayloadType::ExpHistogramDpExemplars, + ArrowPayloadType::Spans, + ArrowPayloadType::SpanLinks, + ArrowPayloadType::SpanEvents, + ArrowPayloadType::ResourceAttrs, + ArrowPayloadType::ScopeAttrs, + ArrowPayloadType::MetricAttrs, + ArrowPayloadType::SpanAttrs, + ArrowPayloadType::LogAttrs, + ArrowPayloadType::SpanLinkAttrs, + ArrowPayloadType::SpanEventAttrs, + ArrowPayloadType::NumberDpAttrs, + ArrowPayloadType::SummaryDpAttrs, + ArrowPayloadType::HistogramDpAttrs, + ArrowPayloadType::ExpHistogramDpAttrs, + ArrowPayloadType::HistogramDpExemplarAttrs, + ArrowPayloadType::NumberDpExemplarAttrs, + ArrowPayloadType::ExpHistogramDpExemplarAttrs, + ] { + let input = RecordBatch::try_new_with_options( + Arc::new(Schema::empty()), + vec![], + &RecordBatchOptions::new().with_row_count(Some(1)), + ) + .unwrap(); + _ = transform_to_known_schema(&input, payload_type).unwrap(); + } + } } From 83270d7def2eca1daa62b2813e4ebd323e52e4cf Mon Sep 17 00:00:00 2001 From: albertlockett Date: Thu, 28 Aug 2025 11:57:03 -0400 Subject: [PATCH 6/7] finished testing some edge cases --- .../crates/otap/src/parquet_exporter.rs | 72 +++++ .../crates/otap/src/parquet_exporter/error.rs | 12 + .../crates/otap/src/parquet_exporter/idgen.rs | 20 +- .../otap/src/parquet_exporter/schema.rs | 298 ++++++++++++++---- 4 files changed, 320 insertions(+), 82 deletions(-) create mode 100644 rust/otap-dataflow/crates/otap/src/parquet_exporter/error.rs diff --git a/rust/otap-dataflow/crates/otap/src/parquet_exporter.rs b/rust/otap-dataflow/crates/otap/src/parquet_exporter.rs index 23cd8c8b6..a45ce9572 100644 --- a/rust/otap-dataflow/crates/otap/src/parquet_exporter.rs +++ b/rust/otap-dataflow/crates/otap/src/parquet_exporter.rs @@ -20,6 +20,7 @@ //! See the [GitHub issue](https://github.com/open-telemetry/otel-arrow/issues/399) for more details. use crate::OTAP_EXPORTER_FACTORIES; +use crate::parquet_exporter::schema::transform_to_known_schema; use crate::pdata::OtapPdata; use std::io::ErrorKind; use std::sync::Arc; @@ -45,6 +46,7 @@ use otap_df_engine::node::NodeId; use otel_arrow_rust::otap::OtapArrowRecords; mod config; +mod error; mod idgen; mod object_store; mod partition; @@ -190,6 +192,15 @@ impl Exporter for ParquetExporter { }); } + // ensure the batches has the schema the parquet writer expects + transform_to_known_schema(&mut otap_batch).map_err(|e| { + // TODO - Ack/Nack instead of returning error + Error::ExporterError { + exporter: effect_handler.exporter_id(), + error: format!("Schema transformation failed: {e}"), + } + })?; + // compute any partitions let partitions = match self.config.partitioning_strategies.as_ref() { Some(strategies) => partition(&otap_batch, strategies), @@ -410,6 +421,67 @@ mod test { }); } + #[test] + fn test_adaptive_schema_optional_columns() { + let test_runtime = TestRuntime::::new(); + let temp_dir = tempfile::tempdir().unwrap(); + let base_dir: String = temp_dir.path().to_str().unwrap().into(); + let exporter = ParquetExporter::new(config::Config { + base_uri: base_dir.clone(), + partitioning_strategies: None, + writer_options: None, + }); + let node_config = Arc::new(NodeUserConfig::new_exporter_config(PARQUET_EXPORTER_URN)); + let exporter = ExporterWrapper::::local::( + exporter, + test_node(test_runtime.config().name.clone()), + node_config, + test_runtime.config(), + ); + + test_runtime + .set_exporter(exporter) + .run_test(move |ctx| { + Box::pin(async move { + let batch1: OtapArrowRecords = + fixtures::create_single_logs_pdata_with_attrs(vec![KeyValue { + key: "strkey".to_string(), + value: Some(AnyValue::new_string("terry")), + }]) + .try_into() + .unwrap(); + + let batch2: OtapArrowRecords = + fixtures::create_single_logs_pdata_with_attrs(vec![KeyValue { + key: "intkey".to_string(), + value: Some(AnyValue::new_int(418)), + }]) + .try_into() + .unwrap(); + + // double check that these contain schemas that are not the same ... + let batch1_attrs = batch1.get(ArrowPayloadType::LogAttrs).unwrap(); + let batch2_attrs = batch2.get(ArrowPayloadType::LogAttrs).unwrap(); + assert_ne!(batch1_attrs.schema(), batch2_attrs.schema()); + + ctx.send_pdata(batch1.into()).await.unwrap(); + ctx.send_pdata(batch2.into()).await.unwrap(); + + ctx.send_shutdown(Duration::from_millis(200), "test completed") + .await + .unwrap(); + }) + }) + .run_validation(move |_ctx, exporter_result| { + Box::pin(async move { + // check no error + exporter_result.unwrap(); + assert_parquet_file_has_rows(&base_dir, ArrowPayloadType::Logs, 2).await; + assert_parquet_file_has_rows(&base_dir, ArrowPayloadType::LogAttrs, 2).await; + }) + }) + } + async fn wait_table_exists(base_dir: &str, payload_type: ArrowPayloadType) { let table_name = payload_type.as_str_name().to_lowercase(); loop { diff --git a/rust/otap-dataflow/crates/otap/src/parquet_exporter/error.rs b/rust/otap-dataflow/crates/otap/src/parquet_exporter/error.rs new file mode 100644 index 000000000..3c59218fb --- /dev/null +++ b/rust/otap-dataflow/crates/otap/src/parquet_exporter/error.rs @@ -0,0 +1,12 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +/// Definition of errors that could happen when exporting OTAP batches to Parquet +#[derive(thiserror::Error, Debug)] +pub enum ParquetExporterError { + #[error("Invalid record batch: {error}")] + InvalidRecordBatch { error: String }, + + #[error("Unknown error occurred: {error}")] + UnknownError { error: String }, +} diff --git a/rust/otap-dataflow/crates/otap/src/parquet_exporter/idgen.rs b/rust/otap-dataflow/crates/otap/src/parquet_exporter/idgen.rs index 7032f37e1..f7b92dc81 100644 --- a/rust/otap-dataflow/crates/otap/src/parquet_exporter/idgen.rs +++ b/rust/otap-dataflow/crates/otap/src/parquet_exporter/idgen.rs @@ -16,15 +16,7 @@ use otel_arrow_rust::proto::opentelemetry::arrow::v1::ArrowPayloadType; use otel_arrow_rust::schema::{consts, update_schema_metadata}; use uuid::Uuid; -/// Definition of errors that could happen when generating an ID -#[derive(thiserror::Error, Debug)] -pub enum IdGeneratorError { - #[error("Invalid record batch: {error}")] - InvalidRecordBatch { error: String }, - - #[error("Unknown error occurred: {error}")] - UnknownError { error: String }, -} +use super::error::ParquetExporterError; /// This is an ID generator that converts the batch IDs (which are only unique within a given /// OTAP Batch) into IDs that can be treated as unique. @@ -61,12 +53,12 @@ impl PartitionSequenceIdGenerator { pub fn generate_unique_ids( &mut self, otap_batch: &mut OtapArrowRecords, - ) -> Result<(), IdGeneratorError> { + ) -> Result<(), ParquetExporterError> { // decode the transport optimized IDs if they are not already decoded. This is needed // to ensure that we are not computing the sequence of IDs based on delta-encoded IDs. // If the IDs are already decoded, this is a no-op. otap_batch.decode_transport_optimized_ids().map_err(|e| { - IdGeneratorError::InvalidRecordBatch { + ParquetExporterError::InvalidRecordBatch { error: format!("failed to decode transport optimized IDs: {e}"), } })?; @@ -157,7 +149,7 @@ impl PartitionSequenceIdGenerator { &self, column_name: &str, record_batch: &RecordBatch, - ) -> Result { + ) -> Result { let schema = record_batch.schema_ref(); let id_column_index = match schema.index_of(column_name) { Ok(index) => index, @@ -172,7 +164,7 @@ impl PartitionSequenceIdGenerator { let id_column = match cast(id_column, &DataType::UInt32) { Ok(id_col) => id_col, Err(err) => { - return Err(IdGeneratorError::InvalidRecordBatch { + return Err(ParquetExporterError::InvalidRecordBatch { error: format!("could not cast ID column to UInt32: {err}"), }); } @@ -180,7 +172,7 @@ impl PartitionSequenceIdGenerator { let new_id_col = match add(&UInt32Array::new_scalar(self.curr_max), &id_column) { Ok(col) => col, Err(e) => { - return Err(IdGeneratorError::UnknownError { + return Err(ParquetExporterError::UnknownError { error: format!("{e}"), }); } diff --git a/rust/otap-dataflow/crates/otap/src/parquet_exporter/schema.rs b/rust/otap-dataflow/crates/otap/src/parquet_exporter/schema.rs index 65c295f34..7d83b4207 100644 --- a/rust/otap-dataflow/crates/otap/src/parquet_exporter/schema.rs +++ b/rust/otap-dataflow/crates/otap/src/parquet_exporter/schema.rs @@ -22,27 +22,38 @@ use std::iter::repeat_n; use std::sync::{Arc, LazyLock}; use arrow::array::{ - Array, ArrayRef, BinaryArray, BooleanArray, FixedSizeBinaryArray, Float32Array, Float64Array, - Int32Array, Int64Array, RecordBatch, StringArray, StructArray, TimestampNanosecondArray, - UInt8Array, UInt16Array, UInt32Array, UInt64Array, + Array, ArrayRef, BinaryArray, BooleanArray, DurationNanosecondArray, FixedSizeBinaryArray, + Float32Array, Float64Array, Int32Array, Int64Array, ListArray, RecordBatch, StringArray, + StructArray, TimestampNanosecondArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array, }; -use arrow::buffer::NullBuffer; +use arrow::buffer::{NullBuffer, OffsetBuffer}; use arrow::datatypes::{DataType, Field, Fields, Schema, TimeUnit}; -use futures::future::Lazy; -use otap_df_engine::error::Error; -use otap_df_otlp::proto::opentelemetry::metrics::v1::metric::Data; +use otel_arrow_rust::otap::OtapArrowRecords; use otel_arrow_rust::proto::opentelemetry::arrow::v1::ArrowPayloadType; -use otel_arrow_rust::schema::consts::ATTRIBUTE_SER; use otel_arrow_rust::schema::{FieldExt, consts}; -/// Transform the -// TODO comments +use super::error::ParquetExporterError; + +/// Transform any record batch in this otap batch into a schema that the parquet writer expects. pub fn transform_to_known_schema( + otap_batch: &mut OtapArrowRecords, +) -> Result<(), ParquetExporterError> { + for payload_type in otap_batch.allowed_payload_types() { + if let Some(rb) = otap_batch.get(*payload_type) { + let rb = transform_record_batch_to_known_schema(rb, *payload_type)?; + otap_batch.set(*payload_type, rb); + } + } + + Ok(()) +} + +fn transform_record_batch_to_known_schema( record_batch: &RecordBatch, payload_type: ArrowPayloadType, -) -> Result { +) -> Result { let current_schema = record_batch.schema_ref(); - let template_schema = get_template_schema(payload_type); + let template_schema = get_template_schema(payload_type)?; let (new_columns, new_fields) = transform_to_known_schema_internal( record_batch.num_rows(), @@ -51,9 +62,13 @@ pub fn transform_to_known_schema( template_schema.fields(), )?; + // important to preserve the schema metadata, as it may be used for partitioning .. + let new_schema = + Arc::new(Schema::new(new_fields).with_metadata(current_schema.metadata().clone())); + // safety: this shouldn't fail b/c we're creating a record batch where the columns all have // the correct length and their datatypes match the schema - let new_rb = RecordBatch::try_new(Arc::new(Schema::new(new_fields)), new_columns) + let new_rb = RecordBatch::try_new(new_schema, new_columns) .expect("unexpected error creating record batch with known schema"); Ok(new_rb) @@ -63,7 +78,7 @@ fn transform_struct_to_known_schema( num_rows: usize, current_array: &StructArray, template_fields: &Fields, -) -> Result { +) -> Result { let (new_columns, new_fields) = transform_to_known_schema_internal( num_rows, current_array.columns(), @@ -83,7 +98,7 @@ fn transform_to_known_schema_internal( current_columns: &[ArrayRef], current_fields: &Fields, template_fields: &Fields, -) -> Result<(Vec, Fields), Error> { +) -> Result<(Vec, Fields), ParquetExporterError> { let mut new_columns = Vec::with_capacity(template_fields.len()); let mut new_fields = Vec::with_capacity(template_fields.len()); @@ -97,16 +112,24 @@ fn transform_to_known_schema_internal( let current_column = ¤t_columns[current_field_index]; let new_column = if let DataType::Struct(_) = current_field.data_type() { // handle struct column + let template_struct_fields = template_field + .as_struct_fields() + .ok_or(ParquetExporterError::InvalidRecordBatch { + error: format!( + "unexpected struct column found for field named '{}', expected type {}", + template_field.name(), + template_field.data_type() + ) + })?; + let new_struct_arr = transform_struct_to_known_schema( num_rows, - // safety: we've just checked the datatype + // safety: we've just already the datatype current_column .as_any() .downcast_ref() .expect("can downcast to struct"), - // TODO -- need to return an error here if this is None b/c it means that - // the record batch we received had a struct column where it shouldn't - template_field.as_struct_fields().unwrap(), + template_struct_fields, )?; Arc::new(new_struct_arr) @@ -146,7 +169,10 @@ fn transform_to_known_schema_internal( Ok((new_columns, Fields::from(new_fields))) } -fn get_all_null_column(data_type: &DataType, length: usize) -> Result { +fn get_all_null_column( + data_type: &DataType, + length: usize, +) -> Result { // TODO once run-end encoding, we can save some memory here by allocating a single RunArray // with one null value, and one run-end of `length`. This would allow us to allocate a few // single length buffers instead of full-length empty buffers @@ -166,21 +192,26 @@ fn get_all_null_column(data_type: &DataType, length: usize) -> Result Arc::new(UInt32Array::new_null(length)), DataType::UInt64 => Arc::new(UInt64Array::new_null(length)), DataType::Utf8 => Arc::new(StringArray::new_null(length)), - DataType::Timestamp(time_unit, _) => match *time_unit { - TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::new_null(length)), - _ => { - todo!() - } - }, - + DataType::Duration(TimeUnit::Nanosecond) => { + Arc::new(DurationNanosecondArray::new_null(length)) + } + DataType::Timestamp(TimeUnit::Nanosecond, _) => { + Arc::new(TimestampNanosecondArray::new_null(length)) + } + DataType::List(field) => Arc::new(ListArray::new_null(field.clone(), length)), DataType::Struct(fields) => get_struct_full_of_nulls_or_defaults(fields, length, true)?, _ => { - todo!() + return Err(ParquetExporterError::InvalidRecordBatch { + error: format!("Could not generate all null column for type {data_type}",), + }); } }) } -fn get_all_default_value_column(data_type: &DataType, length: usize) -> Result { +fn get_all_default_value_column( + data_type: &DataType, + length: usize, +) -> Result { // TODO once run-end encoding, we can save some memory here by allocating a single RunArray // with one default value, and one run-end of `length`. This would allow us to allocate a few // single length buffers instead of full-length empty buffers @@ -201,18 +232,25 @@ fn get_all_default_value_column(data_type: &DataType, length: usize) -> Result Arc::new(UInt32Array::from_iter_values(repeat_n(0, length))), DataType::UInt64 => Arc::new(UInt64Array::from_iter_values(repeat_n(0, length))), DataType::Utf8 => Arc::new(StringArray::from_iter(repeat_n(Some(""), length))), - DataType::Timestamp(time_unit, _) => match *time_unit { - TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from_iter_values(repeat_n( - 0, length, - ))), - _ => { - todo!() - } - }, + DataType::Duration(TimeUnit::Nanosecond) => Arc::new( + DurationNanosecondArray::from_iter_values(repeat_n(0, length)), + ), + DataType::Timestamp(TimeUnit::Nanosecond, _) => Arc::new( + TimestampNanosecondArray::from_iter_values(repeat_n(0, length)), + ), + + DataType::List(field) => Arc::new(ListArray::new( + field.clone(), + OffsetBuffer::new_zeroed(length), + get_all_default_value_column(field.data_type(), 0)?, + Some(NullBuffer::new_valid(length)), + )), DataType::Struct(fields) => get_struct_full_of_nulls_or_defaults(fields, length, false)?, _ => { - todo!() + return Err(ParquetExporterError::InvalidRecordBatch { + error: format!("Could not generate default value column for type {data_type}",), + }); } }) } @@ -224,7 +262,7 @@ fn get_struct_full_of_nulls_or_defaults( fields: &Fields, length: usize, struct_nullable: bool, -) -> Result { +) -> Result { let mut new_fields = Vec::with_capacity(fields.len()); let mut new_columns = Vec::with_capacity(fields.len()); @@ -238,33 +276,35 @@ fn get_struct_full_of_nulls_or_defaults( new_columns.push(new_column); } - let nulls = (!struct_nullable).then(|| NullBuffer::new_valid(length)); + let nulls = struct_nullable.then(|| NullBuffer::new_null(length)); let struct_array = StructArray::new(Fields::from(new_fields), new_columns, nulls); Ok(Arc::new(struct_array)) } -fn get_template_schema(payload_type: ArrowPayloadType) -> &'static Schema { +fn get_template_schema( + payload_type: ArrowPayloadType, +) -> Result<&'static Schema, ParquetExporterError> { match payload_type { - ArrowPayloadType::Logs => &LOGS_TEMPLATE_SCHEMA, + ArrowPayloadType::Logs => Ok(&LOGS_TEMPLATE_SCHEMA), ArrowPayloadType::UnivariateMetrics | ArrowPayloadType::MultivariateMetrics => { - &METRICS_TEMPLATE_SCHEMA + Ok(&METRICS_TEMPLATE_SCHEMA) } - ArrowPayloadType::SummaryDataPoints => &SUMMARY_DP_TEMPLATE_SCHEMA, - ArrowPayloadType::NumberDataPoints => &NUMBERS_DP_TEMPLATE_SCHEMA, - ArrowPayloadType::HistogramDataPoints => &HISTOGRAM_DP_TEMPLATE_SCHEMA, - ArrowPayloadType::ExpHistogramDataPoints => &EXP_HISTOGRAM_DP_TEMPLATE_SCHEMA, + ArrowPayloadType::SummaryDataPoints => Ok(&SUMMARY_DP_TEMPLATE_SCHEMA), + ArrowPayloadType::NumberDataPoints => Ok(&NUMBERS_DP_TEMPLATE_SCHEMA), + ArrowPayloadType::HistogramDataPoints => Ok(&HISTOGRAM_DP_TEMPLATE_SCHEMA), + ArrowPayloadType::ExpHistogramDataPoints => Ok(&EXP_HISTOGRAM_DP_TEMPLATE_SCHEMA), ArrowPayloadType::NumberDpExemplars | ArrowPayloadType::HistogramDpExemplars - | ArrowPayloadType::ExpHistogramDpExemplars => &EXEMPLAR_TEMPLATE_SCHEMA, - ArrowPayloadType::Spans => &SPANS_TEMPLATE_SCHEMA, - ArrowPayloadType::SpanLinks => &SPAN_LINKS_TEMPLATE_SCHEMA, - ArrowPayloadType::SpanEvents => &SPAN_EVENTS_TEMPLATE_SCHEMA, + | ArrowPayloadType::ExpHistogramDpExemplars => Ok(&EXEMPLAR_TEMPLATE_SCHEMA), + ArrowPayloadType::Spans => Ok(&SPANS_TEMPLATE_SCHEMA), + ArrowPayloadType::SpanLinks => Ok(&SPAN_LINKS_TEMPLATE_SCHEMA), + ArrowPayloadType::SpanEvents => Ok(&SPAN_EVENTS_TEMPLATE_SCHEMA), ArrowPayloadType::ResourceAttrs | ArrowPayloadType::ScopeAttrs | ArrowPayloadType::MetricAttrs | ArrowPayloadType::SpanAttrs - | ArrowPayloadType::LogAttrs => &ATTRS_16_TEMPLATE_SCHEMA, + | ArrowPayloadType::LogAttrs => Ok(&ATTRS_16_TEMPLATE_SCHEMA), ArrowPayloadType::SpanLinkAttrs | ArrowPayloadType::SpanEventAttrs | ArrowPayloadType::NumberDpAttrs @@ -273,11 +313,10 @@ fn get_template_schema(payload_type: ArrowPayloadType) -> &'static Schema { | ArrowPayloadType::ExpHistogramDpAttrs | ArrowPayloadType::HistogramDpExemplarAttrs | ArrowPayloadType::NumberDpExemplarAttrs - | ArrowPayloadType::ExpHistogramDpExemplarAttrs => &ATTRS_32_TEMPLATE_SCHEMA, - ArrowPayloadType::Unknown => { - // TODO need to return error here? - todo!() - } + | ArrowPayloadType::ExpHistogramDpExemplarAttrs => Ok(&ATTRS_32_TEMPLATE_SCHEMA), + ArrowPayloadType::Unknown => Err(ParquetExporterError::InvalidRecordBatch { + error: "Cannot convert schema for OTAP Payload type Unknown".to_string(), + }), } } @@ -330,7 +369,7 @@ static LOGS_TEMPLATE_SCHEMA: LazyLock = LazyLock::new(|| { false, ), Field::new(consts::TRACE_ID, DataType::FixedSizeBinary(16), true), - Field::new(consts::SPAN_ID, DataType::FixedSizeBinary(16), true), + Field::new(consts::SPAN_ID, DataType::FixedSizeBinary(8), true), Field::new(consts::SEVERITY_NUMBER, DataType::Int32, true), Field::new(consts::SEVERITY_TEXT, DataType::Utf8, true), Field::new( @@ -622,15 +661,17 @@ static ATTRS_32_TEMPLATE_SCHEMA: LazyLock = LazyLock::new(|| { #[cfg(test)] mod test { use super::*; - use arrow::array::{RecordBatch, RecordBatchOptions, UInt16Array}; + use arrow::{ + array::{DictionaryArray, RecordBatch, RecordBatchOptions, UInt16Array}, + datatypes::UInt8Type, + }; use pretty_assertions::assert_eq; #[test] fn test_coalesces_new_columns_with_empty_columns() { let log_attrs_record_batch = RecordBatch::try_new( Arc::new(Schema::new(vec![ - Field::new(consts::ID, DataType::UInt16, false), - // "resource" is missing, so we should insert a new non-nullable struct + // scope: Field::new( consts::SCOPE, DataType::Struct(Fields::from(vec![ @@ -640,11 +681,10 @@ mod test { ])), false, ), - // TODO fill in other columns: - // 1 - default null/values on FSL - // 2 - nullable struct (body?) - // 3 - keeps dicts - // ... + // add the ID column after the scope column to ensure it correct the column order + Field::new(consts::ID, DataType::UInt16, false), + // "resource" is missing, so we should insert a new non-nullable struct + // also the other columns are missing, so we assert the default/null values are add ])), vec![ Arc::new(UInt16Array::from_iter_values(vec![1, 2, 3])), @@ -662,7 +702,8 @@ mod test { .unwrap(); let result = - transform_to_known_schema(&log_attrs_record_batch, ArrowPayloadType::Logs).unwrap(); + transform_record_batch_to_known_schema(&log_attrs_record_batch, ArrowPayloadType::Logs) + .unwrap(); let expected_resource_fields = Fields::from(vec![ Field::new(consts::ID, DataType::UInt16, true), @@ -677,6 +718,16 @@ mod test { Field::new(consts::DROPPED_ATTRIBUTES_COUNT, DataType::UInt32, true), ]); + let expected_body_fields = Fields::from(vec![ + Field::new(consts::ATTRIBUTE_TYPE, DataType::UInt8, true), + Field::new(consts::ATTRIBUTE_STR, DataType::Utf8, true), + Field::new(consts::ATTRIBUTE_INT, DataType::Int64, true), + Field::new(consts::ATTRIBUTE_DOUBLE, DataType::Float64, true), + Field::new(consts::ATTRIBUTE_BOOL, DataType::Boolean, true), + Field::new(consts::ATTRIBUTE_BYTES, DataType::Binary, true), + Field::new(consts::ATTRIBUTE_SER, DataType::Binary, true), + ]); + let expected = RecordBatch::try_new( Arc::new(Schema::new(vec![ Field::new(consts::ID, DataType::UInt16, false), @@ -690,8 +741,31 @@ mod test { DataType::Struct(expected_scope_fields.clone()), false, ), + Field::new(consts::SCHEMA_URL, DataType::Utf8, false), + Field::new( + consts::TIME_UNIX_NANO, + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new( + consts::OBSERVED_TIME_UNIX_NANO, + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new(consts::TRACE_ID, DataType::FixedSizeBinary(16), true), + Field::new(consts::SPAN_ID, DataType::FixedSizeBinary(8), true), + Field::new(consts::SEVERITY_NUMBER, DataType::Int32, true), + Field::new(consts::SEVERITY_TEXT, DataType::Utf8, true), + Field::new( + consts::BODY, + DataType::Struct(expected_body_fields.clone()), + true, + ), + Field::new(consts::DROPPED_ATTRIBUTES_COUNT, DataType::UInt32, false), + Field::new(consts::FLAGS, DataType::UInt32, false), ])), vec![ + // ensure it kept the original IDs column: Arc::new(UInt16Array::from_iter_values(vec![1, 2, 3])), Arc::new(StructArray::new( expected_resource_fields.clone(), @@ -700,11 +774,13 @@ mod test { Arc::new(StringArray::new_null(3)), Arc::new(UInt32Array::new_null(3)), ], + // ensure that for non-null structs, it creates an all valid null buffer Some(NullBuffer::new_valid(3)), )), Arc::new(StructArray::new( expected_scope_fields.clone(), vec![ + // ensure it keeps the original nested struct column: Arc::new(UInt16Array::from_iter(vec![Some(0), None, Some(1)])), Arc::new(StringArray::new_null(3)), Arc::new(StringArray::new_null(3)), @@ -712,6 +788,29 @@ mod test { ], Some(NullBuffer::new_valid(3)), )), + Arc::new(StringArray::from_iter_values(repeat_n("", 3))), + Arc::new(TimestampNanosecondArray::from_iter_values(repeat_n(0, 3))), + Arc::new(TimestampNanosecondArray::from_iter_values(repeat_n(0, 3))), + Arc::new(FixedSizeBinaryArray::new_null(16, 3)), + Arc::new(FixedSizeBinaryArray::new_null(8, 3)), + Arc::new(Int32Array::new_null(3)), + Arc::new(StringArray::new_null(3)), + Arc::new(StructArray::new( + expected_body_fields.clone(), + vec![ + Arc::new(UInt8Array::new_null(3)), + Arc::new(StringArray::new_null(3)), + Arc::new(Int64Array::new_null(3)), + Arc::new(Float64Array::new_null(3)), + Arc::new(BooleanArray::new_null(3)), + Arc::new(BinaryArray::new_null(3)), + Arc::new(BinaryArray::new_null(3)), + ], + // expect that for nullable structs, it creates a null buffer of all nulls + Some(NullBuffer::new_null(3)), + )), + Arc::new(UInt32Array::from_iter_values(repeat_n(0, 3))), + Arc::new(UInt32Array::from_iter_values(repeat_n(0, 3))), ], ) .unwrap(); @@ -719,6 +818,69 @@ mod test { assert_eq!(result, expected) } + #[test] + fn test_list_nullability() { + // histogram data points template schema just happens to have both null and a non-nullable + // list, so we can use this type to check the behaviour of filling in null list column: + let histogram_dp_rb = RecordBatch::try_new_with_options( + Arc::new(Schema::empty()), + vec![], + &RecordBatchOptions::new().with_row_count(Some(3)), + ) + .unwrap(); + + let result = transform_record_batch_to_known_schema( + &histogram_dp_rb, + ArrowPayloadType::HistogramDataPoints, + ) + .unwrap(); + + let hist_bucket_counts_col = result + .column_by_name(consts::HISTOGRAM_BUCKET_COUNTS) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + + let hist_bounds_col = result + .column_by_name(consts::HISTOGRAM_EXPLICIT_BOUNDS) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + + for i in 0..3 { + assert!(hist_bucket_counts_col.is_valid(i)); + assert_eq!(hist_bucket_counts_col.value(i).len(), 0); + assert!(hist_bounds_col.is_null(i)); + } + } + + #[test] + fn test_keeps_original_datatype_for_dicts() { + let expected_data_type = + DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)); + let attr_record_batch = RecordBatch::try_new( + Arc::new(Schema::new(vec![Field::new( + consts::ATTRIBUTE_STR, + expected_data_type.clone(), + true, + )])), + vec![Arc::new(DictionaryArray::::new( + UInt8Array::from_iter_values([0]), + Arc::new(StringArray::from_iter_values(["a"])), + ))], + ) + .unwrap(); + + let result = + transform_record_batch_to_known_schema(&attr_record_batch, ArrowPayloadType::LogAttrs) + .unwrap(); + + let val_str_column = result.column_by_name(consts::ATTRIBUTE_STR).unwrap(); + assert_eq!(val_str_column.data_type(), &expected_data_type); + } + #[test] fn test_generate_all_null_record_batch() { // this is just a smoke test to ensure that all of our null/empty column generating code @@ -758,7 +920,7 @@ mod test { &RecordBatchOptions::new().with_row_count(Some(1)), ) .unwrap(); - _ = transform_to_known_schema(&input, payload_type).unwrap(); + _ = transform_record_batch_to_known_schema(&input, payload_type).unwrap(); } } } From a7b6965a007471aa107967eeb82ec343997a821d Mon Sep 17 00:00:00 2001 From: albertlockett Date: Thu, 28 Aug 2025 12:16:49 -0400 Subject: [PATCH 7/7] fix the broken test --- rust/otap-dataflow/crates/otap/src/parquet_exporter/schema.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/otap-dataflow/crates/otap/src/parquet_exporter/schema.rs b/rust/otap-dataflow/crates/otap/src/parquet_exporter/schema.rs index 7d83b4207..14653a43d 100644 --- a/rust/otap-dataflow/crates/otap/src/parquet_exporter/schema.rs +++ b/rust/otap-dataflow/crates/otap/src/parquet_exporter/schema.rs @@ -687,7 +687,6 @@ mod test { // also the other columns are missing, so we assert the default/null values are add ])), vec![ - Arc::new(UInt16Array::from_iter_values(vec![1, 2, 3])), Arc::new(StructArray::new( Fields::from(vec![Field::new(consts::ID, DataType::UInt16, true)]), vec![Arc::new(UInt16Array::from_iter(vec![ @@ -697,6 +696,7 @@ mod test { ]))], Some(NullBuffer::new_valid(3)), )), + Arc::new(UInt16Array::from_iter_values(vec![1, 2, 3])), ], ) .unwrap();