diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 11eb9e7867bb..d6546539993b 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -59,7 +59,7 @@ use crate::{ listing::{FileRange, PartitionedFile}, object_store::ObjectStoreUrl, }, - physical_plan::display::{OutputOrderingDisplay, ProjectSchemaDisplay}, + physical_plan::display::{display_orderings, ProjectSchemaDisplay}, }; use arrow::{ @@ -129,26 +129,7 @@ impl DisplayAs for FileScanConfig { write!(f, ", limit={limit}")?; } - if let Some(ordering) = orderings.first() { - if !ordering.is_empty() { - let start = if orderings.len() == 1 { - ", output_ordering=" - } else { - ", output_orderings=[" - }; - write!(f, "{}", start)?; - for (idx, ordering) in - orderings.iter().enumerate().filter(|(_, o)| !o.is_empty()) - { - match idx { - 0 => write!(f, "{}", OutputOrderingDisplay(ordering))?, - _ => write!(f, ", {}", OutputOrderingDisplay(ordering))?, - } - } - let end = if orderings.len() == 1 { "" } else { "]" }; - write!(f, "{}", end)?; - } - } + display_orderings(f, &orderings)?; Ok(()) } diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index 19c2847b09dc..ff106dceb974 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -19,12 +19,13 @@ //! [`crate::displayable`] for examples of how to format use std::fmt; +use std::fmt::Formatter; use super::{accept, ExecutionPlan, ExecutionPlanVisitor}; use arrow_schema::SchemaRef; use datafusion_common::display::{GraphvizBuilder, PlanType, StringifiedPlan}; -use datafusion_physical_expr::PhysicalSortExpr; +use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; /// Options for controlling how each [`ExecutionPlan`] should format itself #[derive(Debug, Clone, Copy)] @@ -437,6 +438,31 @@ impl<'a> fmt::Display for OutputOrderingDisplay<'a> { } } +pub fn display_orderings(f: &mut Formatter, orderings: &[LexOrdering]) -> fmt::Result { + if let Some(ordering) = orderings.first() { + if !ordering.is_empty() { + let start = if orderings.len() == 1 { + ", output_ordering=" + } else { + ", output_orderings=[" + }; + write!(f, "{}", start)?; + for (idx, ordering) in + orderings.iter().enumerate().filter(|(_, o)| !o.is_empty()) + { + match idx { + 0 => write!(f, "{}", OutputOrderingDisplay(ordering))?, + _ => write!(f, ", {}", OutputOrderingDisplay(ordering))?, + } + } + let end = if orderings.len() == 1 { "" } else { "]" }; + write!(f, "{}", end)?; + } + } + + Ok(()) +} + #[cfg(test)] mod tests { use std::fmt::Write; diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index 59819c6921fb..897682092831 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -21,7 +21,7 @@ use std::any::Any; use std::sync::Arc; use super::{DisplayAs, DisplayFormatType}; -use crate::display::{OutputOrderingDisplay, ProjectSchemaDisplay}; +use crate::display::{display_orderings, ProjectSchemaDisplay}; use crate::stream::RecordBatchStreamAdapter; use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream}; @@ -149,18 +149,9 @@ impl DisplayAs for StreamingTableExec { write!(f, ", infinite_source=true")?; } - self.projected_output_ordering - .first() - .map_or(Ok(()), |ordering| { - if !ordering.is_empty() { - write!( - f, - ", output_ordering={}", - OutputOrderingDisplay(ordering) - )?; - } - Ok(()) - }) + display_orderings(f, &self.projected_output_ordering)?; + + Ok(()) } } } diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index fa4445d4cd4c..5a610c16bc7f 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -3582,7 +3582,7 @@ SortPreservingMergeExec: [c@3 ASC NULLS LAST] ------CoalesceBatchesExec: target_batch_size=4096 --------RepartitionExec: partitioning=Hash([d@4], 2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC NULLS LAST ----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -------------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST] +------------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], [c@3 ASC NULLS LAST]] # CTAS with NTILE function statement ok