Skip to content
This repository was archived by the owner on Feb 27, 2026. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions data_types/src/partition_template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ use std::{
cmp::min,
collections::{HashMap, HashSet},
fmt::{Display, Formatter, Write},
ops::{Add, Range},
ops::Range,
sync::{Arc, LazyLock},
};

Expand Down Expand Up @@ -1470,7 +1470,7 @@ impl PartitionDuration {
}
}

impl<Tz: chrono::TimeZone> Add<PartitionDuration> for chrono::DateTime<Tz> {
impl<Tz: chrono::TimeZone> std::ops::Add<PartitionDuration> for chrono::DateTime<Tz> {
type Output = Self;
/// Add a [`PartitionDuration`] to a [`chrono::DateTime`].
fn add(self, rhs: PartitionDuration) -> Self::Output {
Expand Down
2 changes: 1 addition & 1 deletion flightsql/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl Display for PreparedStatementHandle {
write!(
f,
",{}",
pretty_format_batches(&[batch.clone()]).map_err(|_| std::fmt::Error)?
pretty_format_batches(std::slice::from_ref(batch)).map_err(|_| std::fmt::Error)?
)?
};
write!(f, ")")
Expand Down
8 changes: 4 additions & 4 deletions influxdb_line_protocol/src/v3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,10 @@ fn field_family_normal_char(i: &str) -> IResult<&str, &str> {
if c == ':' {
// Peek to see if the next char is also a colon, and if not,
// keep consuming.
if let Some((_, next_ch)) = iter.peek() {
if *next_ch != ':' {
continue;
}
if let Some((_, next_ch)) = iter.peek()
&& *next_ch != ':'
{
continue;
}
}

Expand Down
7 changes: 4 additions & 3 deletions iox_query/src/exec/gapfill/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,9 +549,10 @@ impl GapFillExec {
let schema = input.schema();
let eq_properties = match input.properties().output_ordering() {
None => EquivalenceProperties::new(schema),
Some(output_ordering) => {
EquivalenceProperties::new_with_orderings(schema, &[output_ordering.clone()])
}
Some(output_ordering) => EquivalenceProperties::new_with_orderings(
schema,
std::slice::from_ref(output_ordering),
),
};

let output_partitioning = Partitioning::UnknownPartitioning(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,15 @@ impl TreeNodeRewriter for InfluxRegexToDataFusionRegex {
let name = func.name();
if (args.len() == 2)
&& ((name == REGEX_MATCH_UDF_NAME) || (name == REGEX_NOT_MATCH_UDF_NAME))
&& let Expr::Literal(ScalarValue::Utf8(Some(s)), _) = &args[1]
{
if let Expr::Literal(ScalarValue::Utf8(Some(s)), _) = &args[1] {
let s = clean_non_meta_escapes(s);
let op = match name {
REGEX_MATCH_UDF_NAME => Operator::RegexMatch,
REGEX_NOT_MATCH_UDF_NAME => Operator::RegexNotMatch,
_ => unreachable!(),
};
return Ok(Transformed::yes(binary_expr(args.remove(0), op, lit(s))));
}
let s = clean_non_meta_escapes(s);
let op = match name {
REGEX_MATCH_UDF_NAME => Operator::RegexMatch,
REGEX_NOT_MATCH_UDF_NAME => Operator::RegexNotMatch,
_ => unreachable!(),
};
return Ok(Transformed::yes(binary_expr(args.remove(0), op, lit(s))));
}

Ok(Transformed::yes(Expr::ScalarFunction(ScalarFunction {
Expand Down
19 changes: 9 additions & 10 deletions iox_query/src/physical_optimizer/sort/merge_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,16 @@ pub fn merge_partitions_after_parallelized_sorting(
} else {
// If all lexical ranges are the same, then the partitions are a result of repartitioning. Insert an SPM above the sort.
if let Some(lexical_ranges) = extract_ranges_from_plan(ordering_req, &ctx.plan)?
&& lexical_ranges.iter().dedup().collect_vec().len() == 1
{
if lexical_ranges.iter().dedup().collect_vec().len() == 1 {
let plan = add_sort_preserving_merge(
Arc::clone(&ctx.plan),
sort_exec.expr(),
sort_exec.fetch(),
)?;
let mut new_ctx = MergePartitionsContext::new_default(plan);
new_ctx.data.has_merged_parallelized_sort = true;
return Ok(Transformed::yes(new_ctx));
}
let plan = add_sort_preserving_merge(
Arc::clone(&ctx.plan),
sort_exec.expr(),
sort_exec.fetch(),
)?;
let mut new_ctx = MergePartitionsContext::new_default(plan);
new_ctx.data.has_merged_parallelized_sort = true;
return Ok(Transformed::yes(new_ctx));
};

Ok(Transformed::no(ctx))
Expand Down
4 changes: 2 additions & 2 deletions iox_query/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ mod test {

// simple plan
let plan = provider
.scan(&state, None, &[pred.clone()], None)
.scan(&state, None, std::slice::from_ref(&pred), None)
.await
.unwrap();
insta::assert_yaml_snapshot!(
Expand All @@ -563,7 +563,7 @@ mod test {

// projection
let plan = provider
.scan(&state, Some(&vec![1, 3]), &[pred.clone()], None)
.scan(&state, Some(&vec![1, 3]), std::slice::from_ref(&pred), None)
.await
.unwrap();
insta::assert_yaml_snapshot!(
Expand Down
12 changes: 7 additions & 5 deletions iox_query/src/provider/deduplicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,10 @@ impl DeduplicateExec {
sort_keys: &LexOrdering,
) -> PlanProperties {
trace!("Deduplicate output ordering: {:?}", sort_keys);
let eq_properties =
EquivalenceProperties::new_with_orderings(input.schema(), &[sort_keys.clone()]);
let eq_properties = EquivalenceProperties::new_with_orderings(
input.schema(),
std::slice::from_ref(sort_keys),
);

let output_partitioning = Partitioning::UnknownPartitioning(1);

Expand Down Expand Up @@ -921,7 +923,7 @@ mod test {
"| | | 1.0 |",
"+----+----+-----+",
];
assert_batches_eq!(&expected_input_batch, &[b1.clone()]);
assert_batches_eq!(&expected_input_batch, std::slice::from_ref(&b1));

// sort on t1, t2
let sort_keys = vec![
Expand Down Expand Up @@ -1084,7 +1086,7 @@ mod test {
"| b | | 1.0 |",
"+----+----+-----+",
];
assert_batches_eq!(&expected_input_batch, &[b1.clone()]);
assert_batches_eq!(&expected_input_batch, std::slice::from_ref(&b1));

// sort on t1, t2
let sort_keys = vec![
Expand Down Expand Up @@ -1251,7 +1253,7 @@ mod test {
"| b | a | 1.0 |",
"+----+----+-----+",
];
assert_batches_eq!(&expected_input_batch, &[b1.clone()]);
assert_batches_eq!(&expected_input_batch, std::slice::from_ref(&b1));

// sort on t1, t2
let sort_keys = vec![
Expand Down
7 changes: 4 additions & 3 deletions iox_query_influxql/src/frontend/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,10 @@ impl SchemaExec {
fn compute_properties(input: &Arc<dyn ExecutionPlan>, schema: SchemaRef) -> PlanProperties {
let eq_properties = match input.properties().output_ordering() {
None => EquivalenceProperties::new(schema),
Some(output_ordering) => {
EquivalenceProperties::new_with_orderings(schema, &[output_ordering.clone()])
}
Some(output_ordering) => EquivalenceProperties::new_with_orderings(
schema,
std::slice::from_ref(output_ordering),
),
};

let output_partitioning = input.output_partitioning().clone();
Expand Down
1 change: 1 addition & 0 deletions iox_query_influxql/src/plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
.tag_names()
.map(|ident| ident.as_str())
.sorted()
.dedup()
.collect()
} else {
vec![]
Expand Down
14 changes: 6 additions & 8 deletions iox_query_influxql/src/plan/rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,14 +279,12 @@ impl RewriteSelect {
let (fields, mut group_by) = if has_field_wildcard || has_group_by_wildcard {
let (field_set, mut tag_set) = from_field_and_dimensions(s, from)?;

if !has_group_by_wildcard {
if let Some(group_by) = &stmt.group_by {
// Remove any explicitly listed tags in the GROUP BY clause, so they are not
// expanded by any wildcards specified in the SELECT projection list
group_by.tag_names().for_each(|ident| {
tag_set.remove(ident.as_str());
});
}
if !has_group_by_wildcard && let Some(group_by) = &stmt.group_by {
// Remove any explicitly listed tags in the GROUP BY clause, so they are not
// expanded by any wildcards specified in the SELECT projection list
group_by.tag_names().for_each(|ident| {
tag_set.remove(ident.as_str());
});
}

let fields = if has_field_wildcard {
Expand Down
4 changes: 2 additions & 2 deletions mutable_batch_lp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ m f1=10i 1639612800000000000
"| | 10 | 2021-12-16T00:00:00Z |",
"+-----+----+----------------------+",
],
&[batch.clone()]
std::slice::from_ref(&batch)
);

// Verify the nullness of the string column ("" not the same as null)
Expand Down Expand Up @@ -542,7 +542,7 @@ m b=t 1639612800000000000
"| true | | 2021-12-16T00:00:00Z | |",
"+------+---+----------------------+---+",
],
&[batch.clone()]
std::slice::from_ref(&batch)
);

// Verify the nullness of the int column
Expand Down
79 changes: 3 additions & 76 deletions predicate/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ pub mod rpc_predicate;

use data_types::TimestampRange;
use datafusion::{
common::tree_node::{TreeNodeRecursion, TreeNodeVisitor},
error::DataFusionError,
logical_expr::{BinaryExpr, binary_expr},
prelude::{Expr, col},
};
Expand Down Expand Up @@ -362,12 +360,10 @@ impl TryFrom<Expr> for ValueExpr {
op: _,
right: _,
}) = &expr
&& let Expr::Column(inner) = left.as_ref()
&& inner.name == VALUE_COLUMN_NAME
{
if let Expr::Column(inner) = left.as_ref()
&& inner.name == VALUE_COLUMN_NAME
{
return Ok(Self { expr });
}
return Ok(Self { expr });
}
Err(expr)
}
Expand All @@ -391,75 +387,6 @@ impl From<ValueExpr> for Expr {
}
}

/// Recursively walk an expression tree, checking if the expression is
/// row-based.
///
/// A row-based function takes one row in and produces
/// one value as output.
///
/// Note that even though a predicate expression like `col < 5` can be used to
/// filter rows, the expression itself is row-based (produces a single boolean).
///
/// Examples of non row based expressions are Aggregate and
/// Window function which produce different cardinality than their
/// input.
struct RowBasedVisitor {
row_based: bool,
}

impl Default for RowBasedVisitor {
fn default() -> Self {
Self { row_based: true }
}
}

impl TreeNodeVisitor<'_> for RowBasedVisitor {
type Node = Expr;

fn f_down(&mut self, expr: &Expr) -> Result<TreeNodeRecursion, DataFusionError> {
match expr {
Expr::Alias(_)
| Expr::Between { .. }
| Expr::BinaryExpr { .. }
| Expr::Case { .. }
| Expr::Cast { .. }
| Expr::Column(_)
| Expr::Exists { .. }
| Expr::InList { .. }
| Expr::InSubquery { .. }
| Expr::IsFalse(_)
| Expr::IsNotFalse(_)
| Expr::IsNotNull(_)
| Expr::IsNotTrue(_)
| Expr::IsNotUnknown(_)
| Expr::IsNull(_)
| Expr::IsTrue(_)
| Expr::IsUnknown(_)
| Expr::Like { .. }
| Expr::Literal(_, _)
| Expr::Negative(_)
| Expr::Not(_)
| Expr::OuterReferenceColumn(_, _)
| Expr::Placeholder { .. }
| Expr::ScalarFunction { .. }
| Expr::ScalarSubquery(_)
| Expr::ScalarVariable(_, _)
| Expr::SimilarTo { .. }
| Expr::TryCast { .. } => Ok(TreeNodeRecursion::Continue),
// Exhaustive matching requires us to also match deprecated variants
#[expect(deprecated)]
Expr::Wildcard { .. } => Ok(TreeNodeRecursion::Continue),
Expr::AggregateFunction { .. }
| Expr::GroupingSet(_)
| Expr::WindowFunction { .. }
| Expr::Unnest(_) => {
self.row_based = false;
Ok(TreeNodeRecursion::Stop)
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 0 additions & 1 deletion predicate/src/rpc_predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,6 @@ mod tests {
.unwrap()
}

#[expect(dead_code)]
const fn assert_send<T: Send>() {}

// `InfluxRpcPredicate` shall be `Send`, otherwise we will have problems constructing plans for InfluxRPC
Expand Down
5 changes: 2 additions & 3 deletions predicate/src/rpc_predicate/rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,9 @@ fn simplify_predicate_inner(expr: Expr) -> Result<Transformed<Expr>> {
return Ok(Transformed::yes(*right));
}
} else if let (Some(coll), Some(colr)) = (is_col_op_lit(&left), is_col_not_null(&right))
&& colr == coll
{
if colr == coll {
return Ok(Transformed::yes(*left));
}
return Ok(Transformed::yes(*left));
};

Ok(Transformed::no(Expr::BinaryExpr(BinaryExpr {
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel = "1.88.0"
channel = "1.89.0"
components = ["rustfmt", "clippy"]
2 changes: 0 additions & 2 deletions tracker/src/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ pub struct InstrumentedRawLock<R: Sized> {
unsafe impl<R: lock_api::RawRwLock + Sized> lock_api::RawRwLock for InstrumentedRawLock<R> {
// A “non-constant” const item is a legacy way to supply an initialized value to downstream
// static items. Can hopefully be replaced with `const fn new() -> Self` at some point.
#[expect(clippy::declare_interior_mutable_const)]
const INIT: Self = Self {
inner: R::INIT,
metrics: None,
Expand Down Expand Up @@ -328,7 +327,6 @@ unsafe impl<R: lock_api::RawRwLockUpgrade + Sized> lock_api::RawRwLockUpgrade
unsafe impl<R: lock_api::RawMutex + Sized> lock_api::RawMutex for InstrumentedRawLock<R> {
// A “non-constant” const item is a legacy way to supply an initialized value to downstream
// static items. Can hopefully be replaced with `const fn new() -> Self` at some point.
#[expect(clippy::declare_interior_mutable_const)]
const INIT: Self = Self {
inner: R::INIT,
metrics: None,
Expand Down