Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/otap-dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ log = "0.4"
criterion = "0.7.0"
miette = { version="7.6.0", features = ["fancy"] }
linkme = "0.3.33"
pretty_assertions = "1.4.1"
proc-macro2 = "1.0"
quote = "1.0"
syn = { version = "2.0", features = ["full"] }
Expand Down
1 change: 1 addition & 0 deletions rust/otap-dataflow/crates/otap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,6 @@ weaver_common.workspace = true

[dev-dependencies]
portpicker = "0.1.1"
pretty_assertions = "1.4.1"
tempfile.workspace = true
url.workspace = true
73 changes: 73 additions & 0 deletions rust/otap-dataflow/crates/otap/src/parquet_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
//! See the [GitHub issue](https://github.com/open-telemetry/otel-arrow/issues/399) for more details.

use crate::OTAP_EXPORTER_FACTORIES;
use crate::parquet_exporter::schema::transform_to_known_schema;
use crate::pdata::OtapPdata;
use std::io::ErrorKind;
use std::sync::Arc;
Expand All @@ -45,9 +46,11 @@ use otap_df_engine::node::NodeId;
use otel_arrow_rust::otap::OtapArrowRecords;

mod config;
mod error;
mod idgen;
mod object_store;
mod partition;
mod schema;
mod writer;

#[allow(dead_code)]
Expand Down Expand Up @@ -189,6 +192,15 @@ impl Exporter<OtapPdata> for ParquetExporter {
});
}

// ensure the batches has the schema the parquet writer expects
transform_to_known_schema(&mut otap_batch).map_err(|e| {
// TODO - Ack/Nack instead of returning error
Error::ExporterError {
exporter: effect_handler.exporter_id(),
error: format!("Schema transformation failed: {e}"),
}
})?;

// compute any partitions
let partitions = match self.config.partitioning_strategies.as_ref() {
Some(strategies) => partition(&otap_batch, strategies),
Expand Down Expand Up @@ -409,6 +421,67 @@ mod test {
});
}

#[test]
fn test_adaptive_schema_optional_columns() {
let test_runtime = TestRuntime::<OtapPdata>::new();
let temp_dir = tempfile::tempdir().unwrap();
let base_dir: String = temp_dir.path().to_str().unwrap().into();
let exporter = ParquetExporter::new(config::Config {
base_uri: base_dir.clone(),
partitioning_strategies: None,
writer_options: None,
});
let node_config = Arc::new(NodeUserConfig::new_exporter_config(PARQUET_EXPORTER_URN));
let exporter = ExporterWrapper::<OtapPdata>::local::<ParquetExporter>(
exporter,
test_node(test_runtime.config().name.clone()),
node_config,
test_runtime.config(),
);

test_runtime
.set_exporter(exporter)
.run_test(move |ctx| {
Box::pin(async move {
let batch1: OtapArrowRecords =
fixtures::create_single_logs_pdata_with_attrs(vec![KeyValue {
key: "strkey".to_string(),
value: Some(AnyValue::new_string("terry")),
}])
.try_into()
.unwrap();

let batch2: OtapArrowRecords =
fixtures::create_single_logs_pdata_with_attrs(vec![KeyValue {
key: "intkey".to_string(),
value: Some(AnyValue::new_int(418)),
}])
.try_into()
.unwrap();

// double check that these contain schemas that are not the same ...
let batch1_attrs = batch1.get(ArrowPayloadType::LogAttrs).unwrap();
let batch2_attrs = batch2.get(ArrowPayloadType::LogAttrs).unwrap();
assert_ne!(batch1_attrs.schema(), batch2_attrs.schema());

ctx.send_pdata(batch1.into()).await.unwrap();
ctx.send_pdata(batch2.into()).await.unwrap();

ctx.send_shutdown(Duration::from_millis(200), "test completed")
.await
.unwrap();
})
})
.run_validation(move |_ctx, exporter_result| {
Box::pin(async move {
// check no error
exporter_result.unwrap();
assert_parquet_file_has_rows(&base_dir, ArrowPayloadType::Logs, 2).await;
assert_parquet_file_has_rows(&base_dir, ArrowPayloadType::LogAttrs, 2).await;
})
})
}

async fn wait_table_exists(base_dir: &str, payload_type: ArrowPayloadType) {
let table_name = payload_type.as_str_name().to_lowercase();
loop {
Expand Down
12 changes: 12 additions & 0 deletions rust/otap-dataflow/crates/otap/src/parquet_exporter/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

/// Definition of errors that could happen when exporting OTAP batches to Parquet
#[derive(thiserror::Error, Debug)]
pub enum ParquetExporterError {
#[error("Invalid record batch: {error}")]
InvalidRecordBatch { error: String },

#[error("Unknown error occurred: {error}")]
UnknownError { error: String },
}
20 changes: 6 additions & 14 deletions rust/otap-dataflow/crates/otap/src/parquet_exporter/idgen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,7 @@ use otel_arrow_rust::proto::opentelemetry::arrow::v1::ArrowPayloadType;
use otel_arrow_rust::schema::{consts, update_schema_metadata};
use uuid::Uuid;

/// Definition of errors that could happen when generating an ID
#[derive(thiserror::Error, Debug)]
pub enum IdGeneratorError {
#[error("Invalid record batch: {error}")]
InvalidRecordBatch { error: String },

#[error("Unknown error occurred: {error}")]
UnknownError { error: String },
}
use super::error::ParquetExporterError;

/// This is an ID generator that converts the batch IDs (which are only unique within a given
/// OTAP Batch) into IDs that can be treated as unique.
Expand Down Expand Up @@ -61,12 +53,12 @@ impl PartitionSequenceIdGenerator {
pub fn generate_unique_ids(
&mut self,
otap_batch: &mut OtapArrowRecords,
) -> Result<(), IdGeneratorError> {
) -> Result<(), ParquetExporterError> {
// decode the transport optimized IDs if they are not already decoded. This is needed
// to ensure that we are not computing the sequence of IDs based on delta-encoded IDs.
// If the IDs are already decoded, this is a no-op.
otap_batch.decode_transport_optimized_ids().map_err(|e| {
IdGeneratorError::InvalidRecordBatch {
ParquetExporterError::InvalidRecordBatch {
error: format!("failed to decode transport optimized IDs: {e}"),
}
})?;
Expand Down Expand Up @@ -157,7 +149,7 @@ impl PartitionSequenceIdGenerator {
&self,
column_name: &str,
record_batch: &RecordBatch,
) -> Result<RecordBatch, IdGeneratorError> {
) -> Result<RecordBatch, ParquetExporterError> {
let schema = record_batch.schema_ref();
let id_column_index = match schema.index_of(column_name) {
Ok(index) => index,
Expand All @@ -172,15 +164,15 @@ impl PartitionSequenceIdGenerator {
let id_column = match cast(id_column, &DataType::UInt32) {
Ok(id_col) => id_col,
Err(err) => {
return Err(IdGeneratorError::InvalidRecordBatch {
return Err(ParquetExporterError::InvalidRecordBatch {
error: format!("could not cast ID column to UInt32: {err}"),
});
}
};
let new_id_col = match add(&UInt32Array::new_scalar(self.curr_max), &id_column) {
Ok(col) => col,
Err(e) => {
return Err(IdGeneratorError::UnknownError {
return Err(ParquetExporterError::UnknownError {
error: format!("{e}"),
});
}
Expand Down
Loading
Loading