Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

use data_engine_expressions::{
DataExpression, MutableValueExpression, ScalarExpression, SetTransformExpression,
TransformExpression,
};
use data_engine_parser_abstractions::{ParserError, ParserState, to_query_location};
use pest::iterators::Pair;

use crate::{ottl_parser::Rule, scalar_expression::parse_scalar_expression};

/// Parse an editor expression.
///
/// For more info on editor expressions, see:
/// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.151.0/pkg/ottl/LANGUAGE.md#editors
pub(crate) fn parse_editor_expression(
rule: Pair<'_, Rule>,
state: &mut ParserState,
) -> Result<(), ParserError> {
let query_location = to_query_location(&rule);
let mut inner_rules = rule.into_inner();

if inner_rules.len() != 2 {
return Err(ParserError::SyntaxError(
query_location,
format!(
"{:?} expected two inner rule, found {}",
Rule::editor_expression,
inner_rules.len()
),
));
}

// Parse the name of the editor. For complete list of editors, see:
// https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.151.0/pkg/ottl/ottlfuncs#editors

let editor_name = inner_rules.next().expect("there are two rules");
let arg_list = inner_rules.next().expect("there are two rules");

match editor_name.as_str() {
"set" => {
let (destination, source) = parse_set_expr_args(arg_list, state)?;
state.push_expression(DataExpression::Transform(TransformExpression::Set(
SetTransformExpression::new(query_location, source, destination),
)));
}
other => {
return Err(ParserError::SyntaxNotSupported(
query_location,
format!("editor function '{other}' not supported"),
));
}
}

Ok(())
}

/// parse the destination and source for "set" editor expression
fn parse_set_expr_args(
args: Pair<'_, Rule>,
state: &mut ParserState,
) -> Result<(MutableValueExpression, ScalarExpression), ParserError> {
let query_location = to_query_location(&args);
let mut inner_rules = args.into_inner();
if inner_rules.len() != 2 {
return Err(ParserError::SyntaxError(
query_location,
format!(
"set editor expression expected two args rule, found {}",
inner_rules.len()
),
));
}

let destination = inner_rules.next().expect("there are two rules");
let dest_query_location = to_query_location(&destination);
let destination = match parse_scalar_expression(destination, state)? {
ScalarExpression::Source(source_scalar_expr) => {
MutableValueExpression::Source(source_scalar_expr)
}
other => {
return Err(ParserError::SyntaxError(
dest_query_location,
format!("expected source scalar expression, found {other:?}"),
));
}
};

let source = parse_scalar_expression(inner_rules.next().expect("there are rwo rules"), state)?;

Ok((destination, source))
}
3 changes: 2 additions & 1 deletion rust/experimental/query_engine/ottl-parser/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

pub(crate) mod editor_expression;
pub(crate) mod ottl_parser;
pub(crate) mod scalar_expression;
pub(crate) mod scalar_primitive_expression;

pub use data_engine_parser_abstractions::parse_standard_bool_literal;
pub use data_engine_parser_abstractions::parse_standard_null_literal;
pub use ottl_parser::*;
pub use ottl_parser::OttlParser;

// Note: Re-export Parser API surface so users don't need to also depend on
// parser-abstractions crate just to parse queries.
Expand Down
24 changes: 24 additions & 0 deletions rust/experimental/query_engine/ottl-parser/src/ottl.pest
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// OTTL Grammar is heavily influenced by the following source:
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/LANGUAGE.md

Expand Down Expand Up @@ -30,6 +33,8 @@ string_literal = @{
"\"" ~ double_quote_string_char* ~ "\""
}

ident = @{ ("_" | ASCII_ALPHA) ~ ("_" | ASCII_ALPHANUMERIC)* }

// Expressions
boolean_expression = _{ true_literal | false_literal }

Expand All @@ -39,5 +44,24 @@ scalar_expression = {
| integer_literal
| string_literal
| null_literal
| identifier_expression
| ("(" ~ scalar_expression ~ ")")
}

identifier_expression = {
ident
}

argument_list = { scalar_expression? ~ ("," ~ scalar_expression)* }

// Equivalent of "editor" from collector-contrib OTTL grammar:
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/655cd21d43a239447aa423839412f7ff06d2e9e8/pkg/ottl/grammar.go#L194-L199
editor_expression = { identifier_expression ~ "(" ~ argument_list ~ ")" }

// Equivalent of parsedStatement from collector-contrib OTTL grammar:
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/655cd21d43a239447aa423839412f7ff06d2e9e8/pkg/ottl/grammar.go#L15
statement_expression = { editor_expression }

program = {
SOI ~ statement_expression ~ EOI
}
80 changes: 78 additions & 2 deletions rust/experimental/query_engine/ottl-parser/src/ottl_parser.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,85 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

use pest_derive::Parser;
use data_engine_parser_abstractions::{
Parser, ParserError, ParserOptions, ParserResult, ParserState, to_query_location,
};
use pest::Parser as _;
use pest::iterators::Pair;

#[derive(Parser)]
use crate::editor_expression::parse_editor_expression;

#[derive(pest_derive::Parser)]
#[grammar = "ottl.pest"]
#[allow(dead_code)]
pub(crate) struct OttlPestParser;

/// Parser for OTTL Programs
pub struct OttlParser;

impl Parser for OttlParser {
fn parse_with_options(
query: &str,
options: ParserOptions,
) -> Result<ParserResult, Vec<ParserError>> {
let parse_result = match OttlPestParser::parse(Rule::program, query) {
Ok(rules) => rules,
Err(pest_error) => return Err(vec![ParserError::from_pest_error(query, pest_error)]),
};

let mut state = ParserState::new_with_options(query, options);

for rule in parse_result {
match rule.as_rule() {
Rule::program => {
if let Err(e) = parse_program(rule, &mut state) {
return Err(vec![e]);
}
}
invalid_rule => {
let query_location = to_query_location(&rule);
let err = ParserError::SyntaxError(
query_location,
format!("Invalid top-level rule. Expected program, found {invalid_rule:?}"),
);
return Err(vec![err]);
}
}
}

Ok(ParserResult::new(state.build()?))
}
}

fn parse_program(program_rule: Pair<'_, Rule>, state: &mut ParserState) -> Result<(), ParserError> {
for rule in program_rule.into_inner() {
match rule.as_rule() {
Rule::statement_expression => {
parse_statement_expression(rule, state)?;
}
Rule::EOI => {}
_ => {
let query_location = to_query_location(&rule);
return Err(ParserError::SyntaxError(
query_location,
format!("Invalid child rule found in {:?} {rule:?}", Rule::program),
));
}
}
}

Ok(())
}

fn parse_statement_expression(
rule: Pair<'_, Rule>,
state: &mut ParserState,
) -> Result<(), ParserError> {
for rule in rule.into_inner() {
if matches!(rule.as_rule(), Rule::editor_expression) {
parse_editor_expression(rule, state)?;
}
}

Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use data_engine_expressions::*;
use data_engine_parser_abstractions::*;
use pest::iterators::Pair;

use crate::Rule;
use crate::ottl_parser::Rule;

#[allow(dead_code)]
pub(crate) fn parse_scalar_expression(
Expand All @@ -29,6 +29,18 @@ pub(crate) fn parse_scalar_expression(
}
Rule::null_literal => ScalarExpression::Static(parse_standard_null_literal(scalar_rule)),
Rule::scalar_expression => parse_scalar_expression(scalar_rule, _state)?,
Rule::identifier_expression => {
// parse this as a field on the source
let query_location = to_query_location(&scalar_rule);
let value_accessor = ValueAccessor::new_with_selectors(vec![ScalarExpression::Static(
StaticScalarExpression::String(StringScalarExpression::new(
query_location.clone(),
scalar_rule.as_str(),
)),
)]);

ScalarExpression::Source(SourceScalarExpression::new(query_location, value_accessor))
}
_ => panic!("Unexpected rule in scalar_expression: {scalar_rule}"),
};

Expand All @@ -39,7 +51,7 @@ pub(crate) fn parse_scalar_expression(
mod tests {
use pest::Parser;

use crate::OttlPestParser;
use crate::ottl_parser::OttlPestParser;

use super::*;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

#[cfg(test)]
mod pest_tests {
use crate::{OttlPestParser, Rule};
use crate::ottl_parser::{OttlPestParser, Rule};
use data_engine_parser_abstractions::*;

#[test]
Expand Down Expand Up @@ -69,7 +69,7 @@ mod pest_tests {

#[cfg(test)]
mod parse_tests {
use crate::{OttlPestParser, Rule};
use crate::ottl_parser::{OttlPestParser, Rule};
use data_engine_parser_abstractions::*;

#[test]
Expand Down
1 change: 1 addition & 0 deletions rust/otap-dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ otap-df-telemetry = { path = "crates/telemetry" }
quiver = { package = "otap-df-quiver", path = "crates/quiver" }
data_engine_expressions = { path = "../experimental/query_engine/expressions" }
data_engine_kql_parser = { path = "../experimental/query_engine/kql-parser" }
data_engine_ottl_parser = { path = "../experimental/query_engine/ottl-parser" }
data_engine_parser_abstractions = { path = "../experimental/query_engine/parser-abstractions"}

ahash = "0.8.11"
Expand Down
11 changes: 9 additions & 2 deletions rust/otap-dataflow/crates/core-nodes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ chrono.workspace = true
hashbrown.workspace = true
data_engine_expressions.workspace = true
data_engine_kql_parser.workspace = true
data_engine_ottl_parser.workspace = true
futures.workspace = true
futures-timer.workspace = true
humantime-serde.workspace = true
linkme.workspace = true
object_store = {workspace = true, features = ["fs"]}
object_store = { workspace = true, features = ["fs"] }
parquet.workspace = true
prost.workspace = true
rand.workspace = true
Expand Down Expand Up @@ -66,7 +67,13 @@ weaver_resolver = { workspace = true, optional = true }
weaver_semconv = { workspace = true, optional = true }

[features]
dev-tools = ["dep:weaver_common", "dep:weaver_forge", "dep:weaver_resolved_schema", "dep:weaver_resolver", "dep:weaver_semconv"]
dev-tools = [
"dep:weaver_common",
"dep:weaver_forge",
"dep:weaver_resolved_schema",
"dep:weaver_resolver",
"dep:weaver_semconv",
]
bench = []

[dev-dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,19 @@ pub struct Config {

#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
#[allow(clippy::enum_variant_names)]
pub enum Query {
KqlQuery(String),
OplQuery(String),
// TODO - add section to allow transforms to be specified in OTTL
Ottl(OttlConfig),
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct OttlConfig {
/// OTTL Statements for transforming logs
pub log_statements: Option<Vec<String>>,
// TODO add trace/metrics statements
}

const fn default_inbound_request_limit() -> NonZeroUsize {
Expand Down
Loading
Loading