Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/otap-dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ log = "0.4"
criterion = "0.7.0"
miette = { version="7.6.0", features = ["fancy"] }
linkme = "0.3.33"
pretty_assertions = "1.4.1"
proc-macro2 = "1.0"
quote = "1.0"
syn = { version = "2.0", features = ["full"] }
Expand Down
1 change: 1 addition & 0 deletions rust/otap-dataflow/crates/otap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,6 @@ weaver_common.workspace = true

[dev-dependencies]
portpicker = "0.1.1"
pretty_assertions = "1.4.1"
tempfile.workspace = true
url.workspace = true
73 changes: 73 additions & 0 deletions rust/otap-dataflow/crates/otap/src/parquet_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
//! See the [GitHub issue](https://github.com/open-telemetry/otel-arrow/issues/399) for more details.

use crate::OTAP_EXPORTER_FACTORIES;
use crate::parquet_exporter::schema::transform_to_known_schema;
use crate::pdata::OtapPdata;
use std::io::ErrorKind;
use std::sync::Arc;
Expand All @@ -45,9 +46,11 @@ use otap_df_engine::node::NodeId;
use otel_arrow_rust::otap::OtapArrowRecords;

mod config;
mod error;
mod idgen;
mod object_store;
mod partition;
mod schema;
mod writer;

#[allow(dead_code)]
Expand Down Expand Up @@ -189,6 +192,15 @@ impl Exporter<OtapPdata> for ParquetExporter {
});
}

// ensure the batches has the schema the parquet writer expects
transform_to_known_schema(&mut otap_batch).map_err(|e| {
// TODO - Ack/Nack instead of returning error
Error::ExporterError {
exporter: effect_handler.exporter_id(),
error: format!("Schema transformation failed: {e}"),
}
})?;

// compute any partitions
let partitions = match self.config.partitioning_strategies.as_ref() {
Some(strategies) => partition(&otap_batch, strategies),
Expand Down Expand Up @@ -409,6 +421,67 @@ mod test {
});
}

#[test]
fn test_adaptive_schema_optional_columns() {
let test_runtime = TestRuntime::<OtapPdata>::new();
let temp_dir = tempfile::tempdir().unwrap();
let base_dir: String = temp_dir.path().to_str().unwrap().into();
let exporter = ParquetExporter::new(config::Config {
base_uri: base_dir.clone(),
partitioning_strategies: None,
writer_options: None,
});
let node_config = Arc::new(NodeUserConfig::new_exporter_config(PARQUET_EXPORTER_URN));
let exporter = ExporterWrapper::<OtapPdata>::local::<ParquetExporter>(
exporter,
test_node(test_runtime.config().name.clone()),
node_config,
test_runtime.config(),
);

test_runtime
.set_exporter(exporter)
.run_test(move |ctx| {
Box::pin(async move {
let batch1: OtapArrowRecords =
fixtures::create_single_logs_pdata_with_attrs(vec![KeyValue {
key: "strkey".to_string(),
value: Some(AnyValue::new_string("terry")),
}])
.try_into()
.unwrap();

let batch2: OtapArrowRecords =
fixtures::create_single_logs_pdata_with_attrs(vec![KeyValue {
key: "intkey".to_string(),
value: Some(AnyValue::new_int(418)),
}])
.try_into()
.unwrap();

// double check that these contain schemas that are not the same ...
let batch1_attrs = batch1.get(ArrowPayloadType::LogAttrs).unwrap();
let batch2_attrs = batch2.get(ArrowPayloadType::LogAttrs).unwrap();
assert_ne!(batch1_attrs.schema(), batch2_attrs.schema());

ctx.send_pdata(batch1.into()).await.unwrap();
ctx.send_pdata(batch2.into()).await.unwrap();

ctx.send_shutdown(Duration::from_millis(200), "test completed")
.await
.unwrap();
})
})
.run_validation(move |_ctx, exporter_result| {
Box::pin(async move {
// check no error
exporter_result.unwrap();
assert_parquet_file_has_rows(&base_dir, ArrowPayloadType::Logs, 2).await;
assert_parquet_file_has_rows(&base_dir, ArrowPayloadType::LogAttrs, 2).await;
})
})
}

async fn wait_table_exists(base_dir: &str, payload_type: ArrowPayloadType) {
let table_name = payload_type.as_str_name().to_lowercase();
loop {
Expand Down
12 changes: 12 additions & 0 deletions rust/otap-dataflow/crates/otap/src/parquet_exporter/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

/// Definition of errors that could happen when exporting OTAP batches to Parquet
#[derive(thiserror::Error, Debug)]
pub enum ParquetExporterError {
#[error("Invalid record batch: {error}")]
InvalidRecordBatch { error: String },

#[error("Unknown error occurred: {error}")]
UnknownError { error: String },
}
20 changes: 6 additions & 14 deletions rust/otap-dataflow/crates/otap/src/parquet_exporter/idgen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,7 @@ use otel_arrow_rust::proto::opentelemetry::arrow::v1::ArrowPayloadType;
use otel_arrow_rust::schema::{consts, update_schema_metadata};
use uuid::Uuid;

/// Definition of errors that could happen when generating an ID
#[derive(thiserror::Error, Debug)]
pub enum IdGeneratorError {
#[error("Invalid record batch: {error}")]
InvalidRecordBatch { error: String },

#[error("Unknown error occurred: {error}")]
UnknownError { error: String },
}
use super::error::ParquetExporterError;

/// This is an ID generator that converts the batch IDs (which are only unique within a given
/// OTAP Batch) into IDs that can be treated as unique.
Expand Down Expand Up @@ -61,12 +53,12 @@ impl PartitionSequenceIdGenerator {
pub fn generate_unique_ids(
&mut self,
otap_batch: &mut OtapArrowRecords,
) -> Result<(), IdGeneratorError> {
) -> Result<(), ParquetExporterError> {
// decode the transport optimized IDs if they are not already decoded. This is needed
// to ensure that we are not computing the sequence of IDs based on delta-encoded IDs.
// If the IDs are already decoded, this is a no-op.
otap_batch.decode_transport_optimized_ids().map_err(|e| {
IdGeneratorError::InvalidRecordBatch {
ParquetExporterError::InvalidRecordBatch {
error: format!("failed to decode transport optimized IDs: {e}"),
}
})?;
Expand Down Expand Up @@ -157,7 +149,7 @@ impl PartitionSequenceIdGenerator {
&self,
column_name: &str,
record_batch: &RecordBatch,
) -> Result<RecordBatch, IdGeneratorError> {
) -> Result<RecordBatch, ParquetExporterError> {
let schema = record_batch.schema_ref();
let id_column_index = match schema.index_of(column_name) {
Ok(index) => index,
Expand All @@ -172,15 +164,15 @@ impl PartitionSequenceIdGenerator {
let id_column = match cast(id_column, &DataType::UInt32) {
Ok(id_col) => id_col,
Err(err) => {
return Err(IdGeneratorError::InvalidRecordBatch {
return Err(ParquetExporterError::InvalidRecordBatch {
error: format!("could not cast ID column to UInt32: {err}"),
});
}
};
let new_id_col = match add(&UInt32Array::new_scalar(self.curr_max), &id_column) {
Ok(col) => col,
Err(e) => {
return Err(IdGeneratorError::UnknownError {
return Err(ParquetExporterError::UnknownError {
error: format!("{e}"),
});
}
Expand Down
Loading
Loading