Skip to content

Commit fc324c5

Browse files
Parquet exporter handle optional fields (#1024)
part of #863 Because some OTAP fields are optional, in a stream of record batches we may receive subsequent batches with different schemas. Parquet doesn't support having row groups with different sets of column chunks, which means we need to know the schema a-priori when the writer is created. This PR adds code to normalize the schema of the record batch before writing by: - putting all the fields in the same order - creating all null/default value columns for any missing column The missing columns should have a small overhead when written to disk, because parquet will either write an entirely empty column chunk for the null column (all null count, no data), or and for all default-value columns, parquet will use dictionary and RLE encoding by default, leading to a small column chunk with a single value value in dict & a single run for the key. What's unfortunate is that we still materialize an all-null column before writing with the length of the record batch. This can be optimized when run-end encoded arrays are supported in parquet, because we could just create a run array with a single run of null/default value. The arrow community is currently working on adding support (see apache/arrow-rs#7713 & apache/arrow-rs#8069). --------- Co-authored-by: Laurent Quérel <[email protected]>
1 parent 0d3422f commit fc324c5

File tree

7 files changed

+1032
-15
lines changed

7 files changed

+1032
-15
lines changed

rust/otap-dataflow/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ log = "0.4"
6363
criterion = "0.7.0"
6464
miette = { version="7.6.0", features = ["fancy"] }
6565
linkme = "0.3.33"
66+
pretty_assertions = "1.4.1"
6667
proc-macro2 = "1.0"
6768
quote = "1.0"
6869
syn = { version = "2.0", features = ["full"] }

rust/otap-dataflow/crates/otap/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,5 +61,6 @@ weaver_common.workspace = true
6161

6262
[dev-dependencies]
6363
portpicker = "0.1.1"
64+
pretty_assertions = "1.4.1"
6465
tempfile.workspace = true
6566
url.workspace = true

rust/otap-dataflow/crates/otap/src/parquet_exporter.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
//! See the [GitHub issue](https://github.com/open-telemetry/otel-arrow/issues/399) for more details.
2121
2222
use crate::OTAP_EXPORTER_FACTORIES;
23+
use crate::parquet_exporter::schema::transform_to_known_schema;
2324
use crate::pdata::OtapPdata;
2425
use std::io::ErrorKind;
2526
use std::sync::Arc;
@@ -45,9 +46,11 @@ use otap_df_engine::node::NodeId;
4546
use otel_arrow_rust::otap::OtapArrowRecords;
4647

4748
mod config;
49+
mod error;
4850
mod idgen;
4951
mod object_store;
5052
mod partition;
53+
mod schema;
5154
mod writer;
5255

5356
#[allow(dead_code)]
@@ -189,6 +192,15 @@ impl Exporter<OtapPdata> for ParquetExporter {
189192
});
190193
}
191194

195+
// ensure the batches has the schema the parquet writer expects
196+
transform_to_known_schema(&mut otap_batch).map_err(|e| {
197+
// TODO - Ack/Nack instead of returning error
198+
Error::ExporterError {
199+
exporter: effect_handler.exporter_id(),
200+
error: format!("Schema transformation failed: {e}"),
201+
}
202+
})?;
203+
192204
// compute any partitions
193205
let partitions = match self.config.partitioning_strategies.as_ref() {
194206
Some(strategies) => partition(&otap_batch, strategies),
@@ -409,6 +421,67 @@ mod test {
409421
});
410422
}
411423

424+
#[test]
425+
fn test_adaptive_schema_optional_columns() {
426+
let test_runtime = TestRuntime::<OtapPdata>::new();
427+
let temp_dir = tempfile::tempdir().unwrap();
428+
let base_dir: String = temp_dir.path().to_str().unwrap().into();
429+
let exporter = ParquetExporter::new(config::Config {
430+
base_uri: base_dir.clone(),
431+
partitioning_strategies: None,
432+
writer_options: None,
433+
});
434+
let node_config = Arc::new(NodeUserConfig::new_exporter_config(PARQUET_EXPORTER_URN));
435+
let exporter = ExporterWrapper::<OtapPdata>::local::<ParquetExporter>(
436+
exporter,
437+
test_node(test_runtime.config().name.clone()),
438+
node_config,
439+
test_runtime.config(),
440+
);
441+
442+
test_runtime
443+
.set_exporter(exporter)
444+
.run_test(move |ctx| {
445+
Box::pin(async move {
446+
let batch1: OtapArrowRecords =
447+
fixtures::create_single_logs_pdata_with_attrs(vec![KeyValue {
448+
key: "strkey".to_string(),
449+
value: Some(AnyValue::new_string("terry")),
450+
}])
451+
.try_into()
452+
.unwrap();
453+
454+
let batch2: OtapArrowRecords =
455+
fixtures::create_single_logs_pdata_with_attrs(vec![KeyValue {
456+
key: "intkey".to_string(),
457+
value: Some(AnyValue::new_int(418)),
458+
}])
459+
.try_into()
460+
.unwrap();
461+
462+
// double check that these contain schemas that are not the same ...
463+
let batch1_attrs = batch1.get(ArrowPayloadType::LogAttrs).unwrap();
464+
let batch2_attrs = batch2.get(ArrowPayloadType::LogAttrs).unwrap();
465+
assert_ne!(batch1_attrs.schema(), batch2_attrs.schema());
466+
467+
ctx.send_pdata(batch1.into()).await.unwrap();
468+
ctx.send_pdata(batch2.into()).await.unwrap();
469+
470+
ctx.send_shutdown(Duration::from_millis(200), "test completed")
471+
.await
472+
.unwrap();
473+
})
474+
})
475+
.run_validation(move |_ctx, exporter_result| {
476+
Box::pin(async move {
477+
// check no error
478+
exporter_result.unwrap();
479+
assert_parquet_file_has_rows(&base_dir, ArrowPayloadType::Logs, 2).await;
480+
assert_parquet_file_has_rows(&base_dir, ArrowPayloadType::LogAttrs, 2).await;
481+
})
482+
})
483+
}
484+
412485
async fn wait_table_exists(base_dir: &str, payload_type: ArrowPayloadType) {
413486
let table_name = payload_type.as_str_name().to_lowercase();
414487
loop {
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
/// Definition of errors that could happen when exporting OTAP batches to Parquet
5+
#[derive(thiserror::Error, Debug)]
6+
pub enum ParquetExporterError {
7+
#[error("Invalid record batch: {error}")]
8+
InvalidRecordBatch { error: String },
9+
10+
#[error("Unknown error occurred: {error}")]
11+
UnknownError { error: String },
12+
}

rust/otap-dataflow/crates/otap/src/parquet_exporter/idgen.rs

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,7 @@ use otel_arrow_rust::proto::opentelemetry::arrow::v1::ArrowPayloadType;
1616
use otel_arrow_rust::schema::{consts, update_schema_metadata};
1717
use uuid::Uuid;
1818

19-
/// Definition of errors that could happen when generating an ID
20-
#[derive(thiserror::Error, Debug)]
21-
pub enum IdGeneratorError {
22-
#[error("Invalid record batch: {error}")]
23-
InvalidRecordBatch { error: String },
24-
25-
#[error("Unknown error occurred: {error}")]
26-
UnknownError { error: String },
27-
}
19+
use super::error::ParquetExporterError;
2820

2921
/// This is an ID generator that converts the batch IDs (which are only unique within a given
3022
/// OTAP Batch) into IDs that can be treated as unique.
@@ -61,12 +53,12 @@ impl PartitionSequenceIdGenerator {
6153
pub fn generate_unique_ids(
6254
&mut self,
6355
otap_batch: &mut OtapArrowRecords,
64-
) -> Result<(), IdGeneratorError> {
56+
) -> Result<(), ParquetExporterError> {
6557
// decode the transport optimized IDs if they are not already decoded. This is needed
6658
// to ensure that we are not computing the sequence of IDs based on delta-encoded IDs.
6759
// If the IDs are already decoded, this is a no-op.
6860
otap_batch.decode_transport_optimized_ids().map_err(|e| {
69-
IdGeneratorError::InvalidRecordBatch {
61+
ParquetExporterError::InvalidRecordBatch {
7062
error: format!("failed to decode transport optimized IDs: {e}"),
7163
}
7264
})?;
@@ -157,7 +149,7 @@ impl PartitionSequenceIdGenerator {
157149
&self,
158150
column_name: &str,
159151
record_batch: &RecordBatch,
160-
) -> Result<RecordBatch, IdGeneratorError> {
152+
) -> Result<RecordBatch, ParquetExporterError> {
161153
let schema = record_batch.schema_ref();
162154
let id_column_index = match schema.index_of(column_name) {
163155
Ok(index) => index,
@@ -172,15 +164,15 @@ impl PartitionSequenceIdGenerator {
172164
let id_column = match cast(id_column, &DataType::UInt32) {
173165
Ok(id_col) => id_col,
174166
Err(err) => {
175-
return Err(IdGeneratorError::InvalidRecordBatch {
167+
return Err(ParquetExporterError::InvalidRecordBatch {
176168
error: format!("could not cast ID column to UInt32: {err}"),
177169
});
178170
}
179171
};
180172
let new_id_col = match add(&UInt32Array::new_scalar(self.curr_max), &id_column) {
181173
Ok(col) => col,
182174
Err(e) => {
183-
return Err(IdGeneratorError::UnknownError {
175+
return Err(ParquetExporterError::UnknownError {
184176
error: format!("{e}"),
185177
});
186178
}

0 commit comments

Comments
 (0)