diff --git a/rust/experimental/query_engine/ottl-parser/src/editor_expression.rs b/rust/experimental/query_engine/ottl-parser/src/editor_expression.rs new file mode 100644 index 0000000000..4bb2eff3f1 --- /dev/null +++ b/rust/experimental/query_engine/ottl-parser/src/editor_expression.rs @@ -0,0 +1,93 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +use data_engine_expressions::{ + DataExpression, MutableValueExpression, ScalarExpression, SetTransformExpression, + TransformExpression, +}; +use data_engine_parser_abstractions::{ParserError, ParserState, to_query_location}; +use pest::iterators::Pair; + +use crate::{ottl_parser::Rule, scalar_expression::parse_scalar_expression}; + +/// Parse an editor expression. +/// +/// For more info on editor expressions, see: +/// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.151.0/pkg/ottl/LANGUAGE.md#editors +pub(crate) fn parse_editor_expression( + rule: Pair<'_, Rule>, + state: &mut ParserState, +) -> Result<(), ParserError> { + let query_location = to_query_location(&rule); + let mut inner_rules = rule.into_inner(); + + if inner_rules.len() != 2 { + return Err(ParserError::SyntaxError( + query_location, + format!( + "{:?} expected two inner rule, found {}", + Rule::editor_expression, + inner_rules.len() + ), + )); + } + + // Parse the name of the editor. For complete list of editors, see: + // https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.151.0/pkg/ottl/ottlfuncs#editors + + let editor_name = inner_rules.next().expect("there are two rules"); + let arg_list = inner_rules.next().expect("there are two rules"); + + match editor_name.as_str() { + "set" => { + let (destination, source) = parse_set_expr_args(arg_list, state)?; + state.push_expression(DataExpression::Transform(TransformExpression::Set( + SetTransformExpression::new(query_location, source, destination), + ))); + } + other => { + return Err(ParserError::SyntaxNotSupported( + query_location, + format!("editor function '{other}' not supported"), + )); + } + } + + Ok(()) +} + +/// parse the destination and source for "set" editor expression +fn parse_set_expr_args( + args: Pair<'_, Rule>, + state: &mut ParserState, +) -> Result<(MutableValueExpression, ScalarExpression), ParserError> { + let query_location = to_query_location(&args); + let mut inner_rules = args.into_inner(); + if inner_rules.len() != 2 { + return Err(ParserError::SyntaxError( + query_location, + format!( + "set editor expression expected two args rule, found {}", + inner_rules.len() + ), + )); + } + + let destination = inner_rules.next().expect("there are two rules"); + let dest_query_location = to_query_location(&destination); + let destination = match parse_scalar_expression(destination, state)? { + ScalarExpression::Source(source_scalar_expr) => { + MutableValueExpression::Source(source_scalar_expr) + } + other => { + return Err(ParserError::SyntaxError( + dest_query_location, + format!("expected source scalar expression, found {other:?}"), + )); + } + }; + + let source = parse_scalar_expression(inner_rules.next().expect("there are rwo rules"), state)?; + + Ok((destination, source)) +} diff --git a/rust/experimental/query_engine/ottl-parser/src/lib.rs b/rust/experimental/query_engine/ottl-parser/src/lib.rs index 76b2432271..f449538d4f 100644 --- a/rust/experimental/query_engine/ottl-parser/src/lib.rs +++ b/rust/experimental/query_engine/ottl-parser/src/lib.rs @@ -1,13 +1,14 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +pub(crate) mod editor_expression; pub(crate) mod ottl_parser; pub(crate) mod scalar_expression; pub(crate) mod scalar_primitive_expression; pub use data_engine_parser_abstractions::parse_standard_bool_literal; pub use data_engine_parser_abstractions::parse_standard_null_literal; -pub use ottl_parser::*; +pub use ottl_parser::OttlParser; // Note: Re-export Parser API surface so users don't need to also depend on // parser-abstractions crate just to parse queries. diff --git a/rust/experimental/query_engine/ottl-parser/src/ottl.pest b/rust/experimental/query_engine/ottl-parser/src/ottl.pest index 22f9901f66..b7cc2f994c 100644 --- a/rust/experimental/query_engine/ottl-parser/src/ottl.pest +++ b/rust/experimental/query_engine/ottl-parser/src/ottl.pest @@ -1,3 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + // OTTL Grammar is heavily influenced by the following source: // https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/LANGUAGE.md @@ -30,6 +33,8 @@ string_literal = @{ "\"" ~ double_quote_string_char* ~ "\"" } +ident = @{ ("_" | ASCII_ALPHA) ~ ("_" | ASCII_ALPHANUMERIC)* } + // Expressions boolean_expression = _{ true_literal | false_literal } @@ -39,5 +44,24 @@ scalar_expression = { | integer_literal | string_literal | null_literal + | identifier_expression | ("(" ~ scalar_expression ~ ")") } + +identifier_expression = { + ident +} + +argument_list = { scalar_expression? ~ ("," ~ scalar_expression)* } + +// Equivalent of "editor" from collector-contrib OTTL grammar: +// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/655cd21d43a239447aa423839412f7ff06d2e9e8/pkg/ottl/grammar.go#L194-L199 +editor_expression = { identifier_expression ~ "(" ~ argument_list ~ ")" } + +// Equivalent of parsedStatement from collector-contrib OTTL grammar: +// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/655cd21d43a239447aa423839412f7ff06d2e9e8/pkg/ottl/grammar.go#L15 +statement_expression = { editor_expression } + +program = { + SOI ~ statement_expression ~ EOI +} diff --git a/rust/experimental/query_engine/ottl-parser/src/ottl_parser.rs b/rust/experimental/query_engine/ottl-parser/src/ottl_parser.rs index f255fbac3f..5892c920dc 100644 --- a/rust/experimental/query_engine/ottl-parser/src/ottl_parser.rs +++ b/rust/experimental/query_engine/ottl-parser/src/ottl_parser.rs @@ -1,9 +1,85 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -use pest_derive::Parser; +use data_engine_parser_abstractions::{ + Parser, ParserError, ParserOptions, ParserResult, ParserState, to_query_location, +}; +use pest::Parser as _; +use pest::iterators::Pair; -#[derive(Parser)] +use crate::editor_expression::parse_editor_expression; + +#[derive(pest_derive::Parser)] #[grammar = "ottl.pest"] #[allow(dead_code)] pub(crate) struct OttlPestParser; + +/// Parser for OTTL Programs +pub struct OttlParser; + +impl Parser for OttlParser { + fn parse_with_options( + query: &str, + options: ParserOptions, + ) -> Result> { + let parse_result = match OttlPestParser::parse(Rule::program, query) { + Ok(rules) => rules, + Err(pest_error) => return Err(vec![ParserError::from_pest_error(query, pest_error)]), + }; + + let mut state = ParserState::new_with_options(query, options); + + for rule in parse_result { + match rule.as_rule() { + Rule::program => { + if let Err(e) = parse_program(rule, &mut state) { + return Err(vec![e]); + } + } + invalid_rule => { + let query_location = to_query_location(&rule); + let err = ParserError::SyntaxError( + query_location, + format!("Invalid top-level rule. Expected program, found {invalid_rule:?}"), + ); + return Err(vec![err]); + } + } + } + + Ok(ParserResult::new(state.build()?)) + } +} + +fn parse_program(program_rule: Pair<'_, Rule>, state: &mut ParserState) -> Result<(), ParserError> { + for rule in program_rule.into_inner() { + match rule.as_rule() { + Rule::statement_expression => { + parse_statement_expression(rule, state)?; + } + Rule::EOI => {} + _ => { + let query_location = to_query_location(&rule); + return Err(ParserError::SyntaxError( + query_location, + format!("Invalid child rule found in {:?} {rule:?}", Rule::program), + )); + } + } + } + + Ok(()) +} + +fn parse_statement_expression( + rule: Pair<'_, Rule>, + state: &mut ParserState, +) -> Result<(), ParserError> { + for rule in rule.into_inner() { + if matches!(rule.as_rule(), Rule::editor_expression) { + parse_editor_expression(rule, state)?; + } + } + + Ok(()) +} diff --git a/rust/experimental/query_engine/ottl-parser/src/scalar_expression.rs b/rust/experimental/query_engine/ottl-parser/src/scalar_expression.rs index 01a5a7fdce..9514dcfc1b 100644 --- a/rust/experimental/query_engine/ottl-parser/src/scalar_expression.rs +++ b/rust/experimental/query_engine/ottl-parser/src/scalar_expression.rs @@ -5,7 +5,7 @@ use data_engine_expressions::*; use data_engine_parser_abstractions::*; use pest::iterators::Pair; -use crate::Rule; +use crate::ottl_parser::Rule; #[allow(dead_code)] pub(crate) fn parse_scalar_expression( @@ -29,6 +29,18 @@ pub(crate) fn parse_scalar_expression( } Rule::null_literal => ScalarExpression::Static(parse_standard_null_literal(scalar_rule)), Rule::scalar_expression => parse_scalar_expression(scalar_rule, _state)?, + Rule::identifier_expression => { + // parse this as a field on the source + let query_location = to_query_location(&scalar_rule); + let value_accessor = ValueAccessor::new_with_selectors(vec![ScalarExpression::Static( + StaticScalarExpression::String(StringScalarExpression::new( + query_location.clone(), + scalar_rule.as_str(), + )), + )]); + + ScalarExpression::Source(SourceScalarExpression::new(query_location, value_accessor)) + } _ => panic!("Unexpected rule in scalar_expression: {scalar_rule}"), }; @@ -39,7 +51,7 @@ pub(crate) fn parse_scalar_expression( mod tests { use pest::Parser; - use crate::OttlPestParser; + use crate::ottl_parser::OttlPestParser; use super::*; diff --git a/rust/experimental/query_engine/ottl-parser/src/scalar_primitive_expression.rs b/rust/experimental/query_engine/ottl-parser/src/scalar_primitive_expression.rs index ba6519206c..b0ce8ec2b8 100644 --- a/rust/experimental/query_engine/ottl-parser/src/scalar_primitive_expression.rs +++ b/rust/experimental/query_engine/ottl-parser/src/scalar_primitive_expression.rs @@ -3,7 +3,7 @@ #[cfg(test)] mod pest_tests { - use crate::{OttlPestParser, Rule}; + use crate::ottl_parser::{OttlPestParser, Rule}; use data_engine_parser_abstractions::*; #[test] @@ -69,7 +69,7 @@ mod pest_tests { #[cfg(test)] mod parse_tests { - use crate::{OttlPestParser, Rule}; + use crate::ottl_parser::{OttlPestParser, Rule}; use data_engine_parser_abstractions::*; #[test] diff --git a/rust/otap-dataflow/Cargo.toml b/rust/otap-dataflow/Cargo.toml index ae0a1c0cef..cd599ab2f0 100644 --- a/rust/otap-dataflow/Cargo.toml +++ b/rust/otap-dataflow/Cargo.toml @@ -63,6 +63,7 @@ otap-df-telemetry = { path = "crates/telemetry" } quiver = { package = "otap-df-quiver", path = "crates/quiver" } data_engine_expressions = { path = "../experimental/query_engine/expressions" } data_engine_kql_parser = { path = "../experimental/query_engine/kql-parser" } +data_engine_ottl_parser = { path = "../experimental/query_engine/ottl-parser" } data_engine_parser_abstractions = { path = "../experimental/query_engine/parser-abstractions"} ahash = "0.8.11" diff --git a/rust/otap-dataflow/crates/core-nodes/Cargo.toml b/rust/otap-dataflow/crates/core-nodes/Cargo.toml index 2d75be447d..6ffae8641c 100644 --- a/rust/otap-dataflow/crates/core-nodes/Cargo.toml +++ b/rust/otap-dataflow/crates/core-nodes/Cargo.toml @@ -34,11 +34,12 @@ chrono.workspace = true hashbrown.workspace = true data_engine_expressions.workspace = true data_engine_kql_parser.workspace = true +data_engine_ottl_parser.workspace = true futures.workspace = true futures-timer.workspace = true humantime-serde.workspace = true linkme.workspace = true -object_store = {workspace = true, features = ["fs"]} +object_store = { workspace = true, features = ["fs"] } parquet.workspace = true prost.workspace = true rand.workspace = true @@ -66,7 +67,13 @@ weaver_resolver = { workspace = true, optional = true } weaver_semconv = { workspace = true, optional = true } [features] -dev-tools = ["dep:weaver_common", "dep:weaver_forge", "dep:weaver_resolved_schema", "dep:weaver_resolver", "dep:weaver_semconv"] +dev-tools = [ + "dep:weaver_common", + "dep:weaver_forge", + "dep:weaver_resolved_schema", + "dep:weaver_resolver", + "dep:weaver_semconv", +] bench = [] [dev-dependencies] diff --git a/rust/otap-dataflow/crates/core-nodes/src/processors/transform_processor/config.rs b/rust/otap-dataflow/crates/core-nodes/src/processors/transform_processor/config.rs index 89710ecf64..7532f42555 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/processors/transform_processor/config.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/processors/transform_processor/config.rs @@ -45,10 +45,19 @@ pub struct Config { #[derive(Debug, Deserialize)] #[serde(rename_all = "snake_case")] +#[allow(clippy::enum_variant_names)] pub enum Query { KqlQuery(String), OplQuery(String), - // TODO - add section to allow transforms to be specified in OTTL + Ottl(OttlConfig), +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct OttlConfig { + /// OTTL Statements for transforming logs + pub log_statements: Option>, + // TODO add trace/metrics statements } const fn default_inbound_request_limit() -> NonZeroUsize { diff --git a/rust/otap-dataflow/crates/core-nodes/src/processors/transform_processor/mod.rs b/rust/otap-dataflow/crates/core-nodes/src/processors/transform_processor/mod.rs index 49709cad5f..b0ec46e3fd 100644 --- a/rust/otap-dataflow/crates/core-nodes/src/processors/transform_processor/mod.rs +++ b/rust/otap-dataflow/crates/core-nodes/src/processors/transform_processor/mod.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use async_trait::async_trait; use data_engine_expressions::{Expression, PipelineExpression}; use data_engine_kql_parser::{KqlParser, Parser}; +use data_engine_ottl_parser::OttlParser; use linkme::distributed_slice; use otap_df_config::{SignalType, error::Error as ConfigError, node::NodeUserConfig}; use otap_df_engine::{ @@ -65,14 +66,18 @@ pub const TRANSFORM_PROCESSOR_URN: &str = "urn:otel:processor:transform"; /// Opentelemetry Processing Language Processor pub struct TransformProcessor { - pipeline: Pipeline, execution_state: ExecutionState, - signal_scope: SignalScope, + transforms: Vec, contexts: Contexts, metrics: MetricSet, sanitize_results: bool, } +struct Transform { + signal_scope: SignalScope, + pipeline: Pipeline, +} + /// Identifier for which signal types the transformation pipeline should be applied. enum SignalScope { // Apply transformation to all signal types @@ -119,15 +124,59 @@ impl TransformProcessor { // TODO we should pass some context to the parser so we can determine if there are valid // identifiers when checking the config: // https://github.com/open-telemetry/otel-arrow/issues/1530 - let parser_options = default_parser_options(); - let pipeline_expr = match &config.query { - Query::KqlQuery(query) => KqlParser::parse_with_options(query, parser_options), - Query::OplQuery(query) => OplParser::parse_with_options(query, parser_options), - } - .map_err(|e| ConfigError::InvalidUserConfig { + + let map_parser_err = |e| ConfigError::InvalidUserConfig { error: format!("Could not parse TransformProcessor query: {e:?}"), - })? - .pipeline; + }; + let parser_options = default_parser_options(); + + let pipeline_options = PipelineOptions { + filter_attribute_keys_case_sensitive: config.filter_attribute_keys_case_sensitive, + }; + + let transforms = match &config.query { + Query::KqlQuery(query) => { + let pipeline_expr = KqlParser::parse_with_options(query, parser_options) + .map_err(map_parser_err)? + .pipeline; + let signal_scope = SignalScope::try_from(&pipeline_expr)?; + vec![Transform { + pipeline: Pipeline::new_with_options(pipeline_expr, pipeline_options), + signal_scope, + }] + } + Query::OplQuery(query) => { + let pipeline_expr = OplParser::parse_with_options(query, parser_options) + .map_err(map_parser_err)? + .pipeline; + let signal_scope = SignalScope::try_from(&pipeline_expr)?; + vec![Transform { + pipeline: Pipeline::new_with_options(pipeline_expr, pipeline_options), + signal_scope, + }] + } + Query::Ottl(ottl_config) => { + let mut transforms = Vec::new(); + if let Some(log_statements) = &ottl_config.log_statements { + for statement in log_statements { + let pipeline_expr = + OttlParser::parse_with_options(statement, parser_options.clone()) + .map_err(map_parser_err)? + .pipeline; + + transforms.push(Transform { + pipeline: Pipeline::new_with_options( + pipeline_expr, + pipeline_options.clone(), + ), + signal_scope: SignalScope::Signal(SignalType::Logs), + }) + } + } + + transforms + } + }; // TODO: it would be nice if we could validate that the pipeline expr is supported by the // query engine here. Currently, validation happens lazily when the first batch is seen. @@ -136,13 +185,8 @@ impl TransformProcessor { let mut execution_state = ExecutionState::new(); execution_state.set_extension::(Box::new(RouterImpl::new())); - let pipeline_options = PipelineOptions { - filter_attribute_keys_case_sensitive: config.filter_attribute_keys_case_sensitive, - }; - Ok(Self { - signal_scope: SignalScope::try_from(&pipeline_expr)?, - pipeline: Pipeline::new_with_options(pipeline_expr, pipeline_options), + transforms, metrics: pipeline_ctx.register_metrics::(), contexts: Contexts::new(config.inbound_request_limit, config.outbound_request_limit), execution_state, @@ -150,14 +194,6 @@ impl TransformProcessor { }) } - /// determine if the transformation should be applied to this pdata, or if it should be skipped - fn should_process(&self, pdata: &OtapPayload) -> bool { - match self.signal_scope { - SignalScope::All => true, - SignalScope::Signal(signal_type) => signal_type == pdata.signal_type(), - } - } - /// sends any result batches that were produced by the pipeline to the appropriate output ports /// while managing subscriptions and context async fn handle_exec_result( @@ -404,19 +440,46 @@ impl Processor for TransformProcessor { }, Message::PData(pdata) => { let (context, payload) = pdata.into_parts(); - if !self.should_process(&payload) { - // skip handling this pdata - effect_handler - .send_message_with_source_node(OtapPdata::new(context, payload)) - .await?; - } else { - let mut otap_batch: OtapArrowRecords = payload.try_into()?; + let pdata_signal_type = payload.signal_type(); + let mut payload = Some(payload); + let mut transformed = false; + let mut transform_error = None; + + // Execute all transforms. We skip transforms where the batch's signal type is not + // selected by the signal scope, and lazily convert the pdata payload to OTAP + // if/when we find a transform to apply. If any transform error occurs, break early + // and set transform_error to `Some`. + // + // State at the end of this loop: + // - Either payload or `transform_error` will be `Some` + // - If we applied any transform then: + // - `transformed` will be set to `true` + // - if payload is `Some` then contained payload variant will be OtelArrowRecords + for transform in &mut self.transforms { + let should_process = match &transform.signal_scope { + SignalScope::All => true, + SignalScope::Signal(scope_signal_type) => { + pdata_signal_type == *scope_signal_type + } + }; + + if !should_process { + // skip applying this transform as it does not select the signal type + continue; + } + transformed = true; + + // convert payload to OTAP & remove delta encoded IDs. + // safety: we know payload will have been initialized to Some either, before + // entering the loop, or during the previous iteration. + let mut otap_batch: OtapArrowRecords = + payload.take().expect("payload initialized").try_into()?; otap_batch.decode_transport_optimized_ids()?; - let result = self + + let result = transform .pipeline .execute_with_state(otap_batch, &mut self.execution_state) .await - .inspect(|_| self.metrics.msgs_transformed.inc()) .map_err(|e| { self.metrics.msgs_transform_failed.inc(); EngineError::ProcessorError { @@ -427,9 +490,49 @@ impl Processor for TransformProcessor { } }); + match result { + Ok(next_result) => { + // initialize payload for the next loop iteration + payload = Some(OtapPayload::OtapArrowRecords(next_result)); + } + Err(e) => { + transform_error = Some(e); + break; + } + } + } + + if transformed { + self.metrics.msgs_transformed.inc(); + let result = match transform_error { + Some(e) => Err(e), + None => { + // safety: since error is `None`, we know payload must be `Some` based + // on the logic in the loop above, so it is safe to expect here + match payload.take().expect("payload option initialized") { + OtapPayload::OtapArrowRecords(otap_batch) => Ok(otap_batch), + _ => { + // safety: if any transform applied then we'll have converted + // the payload the OTAP, so we know here that it must be this + // variant of OtapPayload + unreachable!("expected OTAP payload variant") + } + } + } + }; self.handle_exec_result(context, result, effect_handler) .await?; - }; + } else { + // safety: payload is initialized to Some, and only modified if any transforms + // are applied. In this location, we know no transforms were applied so we can + // safely expect take here to return Some + let payload = payload.take().expect("payload option initialized"); + + // all transforms were skipped for this pdata, just forward the original payload + effect_handler + .send_message_with_source_node(OtapPdata::new(context, payload)) + .await?; + } } }; @@ -694,6 +797,76 @@ mod test { }); } + #[test] + fn test_simple_ottl_pipeline() { + let runtime = TestRuntime::::new(); + let processor = try_create_with_config( + json!({ + "ottl": { + "log_statements": [ + "set(severity_text, \"ERROR\")" + ] + } + }), + &runtime, + ) + .expect("created processor"); + runtime + .set_processor(processor) + .run_test(|mut ctx| async move { + let log_records = vec![ + LogRecord::build().severity_text("INFO").finish(), + LogRecord::build().severity_text("ERROR").finish(), + ]; + + let otap_batch = otlp_to_otap(&OtlpProtoMessage::Logs(LogsData { + resource_logs: vec![ResourceLogs::new( + Resource::default(), + vec![ScopeLogs::new( + InstrumentationScope::default(), + log_records.clone(), + )], + )], + })); + + let pdata = OtapPdata::new_default(otap_batch.into()); + ctx.process(Message::PData(pdata)) + .await + .expect("no process error"); + + let out = ctx + .drain_pdata() + .await + .into_iter() + .map(OtapPdata::payload) + .map(OtapArrowRecords::try_from) + .map(Result::unwrap); + let otap_batch = out.into_iter().next().unwrap(); + + let result = otap_to_otlp(&otap_batch); + + match result { + OtlpProtoMessage::Logs(logs_data) => { + assert_eq!(logs_data.resource_logs.len(), 1); + assert_eq!(logs_data.resource_logs[0].scope_logs.len(), 1); + assert_eq!( + logs_data.resource_logs[0].scope_logs[0].log_records.len(), + 2 + ); + for log_record in &logs_data.resource_logs[0].scope_logs[0].log_records { + assert_eq!(log_record.severity_text, "ERROR") + } + } + invalid => { + panic!( + "invalid signal type from output. Expected logs, received {invalid:?}" + ) + } + } + }) + .validate(|_ctx| async move {}); + } + #[test] fn test_calling_pipeline_with_function_call() { let runtime = TestRuntime::::new(); diff --git a/rust/otap-dataflow/crates/query-engine/Cargo.toml b/rust/otap-dataflow/crates/query-engine/Cargo.toml index 4987dff46a..63c16d3a15 100644 --- a/rust/otap-dataflow/crates/query-engine/Cargo.toml +++ b/rust/otap-dataflow/crates/query-engine/Cargo.toml @@ -29,6 +29,7 @@ otap-df-pdata = { workspace = true, features = ["testing"] } [dev-dependencies] criterion = { workspace = true, features = ["async_tokio"] } data_engine_kql_parser = { workspace = true } +data_engine_ottl_parser = { workspace = true } data_engine_parser_abstractions = { workspace = true } pretty_assertions = { workspace = true } prost = { workspace = true } diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline.rs index 9e77d37bbf..7b4f4e9fd9 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline.rs @@ -242,6 +242,7 @@ impl PlannedPipeline { } /// Options for pipeline +#[derive(Clone)] pub struct PipelineOptions { /// Whether to treat attribute key match as case sensitive during filtering stages pub filter_attribute_keys_case_sensitive: bool, diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs index fcd1649c9a..bc7bf41286 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs @@ -1740,6 +1740,7 @@ mod test { datatypes::DataType, }; use data_engine_kql_parser::{KqlParser, Parser}; + use data_engine_ottl_parser::OttlParser; use otap_df_opl::parser::OplParser; use otap_df_pdata::{ OtapArrowRecords, @@ -1792,6 +1793,21 @@ mod test { test_insert_root_column_from_scalar::().await } + #[tokio::test] + async fn test_insert_root_column_from_scalar_ottl_parser() { + let logs_data = to_logs_data(vec![ + LogRecord::build().finish(), + LogRecord::build().finish(), + ]); + let query = "set(severity_text, \"ERROR\")"; + let result = exec_logs_pipeline::(query, logs_data).await; + let logs_records = result.resource_logs[0].scope_logs[0].log_records.clone(); + assert_eq!(logs_records.len(), 2); + for logs_record in logs_records { + assert_eq!(logs_record.severity_text, "ERROR"); + } + } + async fn test_set_multiple_root_columns() { let logs_data = to_logs_data(vec![ LogRecord::build().severity_text("INFO").finish(),