Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 61 additions & 38 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
/// `ExecutionPlan`'s output from its input. See [`Partitioning`] for more
/// details on partitioning.
///
/// Methods such as [`Self::schema`] and [`ExecutionPlanProperties::output_partitioning`] communicate
/// properties of this output to the DataFusion optimizer, and methods such as
/// Methods such as [`Self::schema`] and [`Self::properties`] communicate
/// properties of the output to the DataFusion optimizer, and methods such as
/// [`required_input_distribution`] and [`required_input_ordering`] express
/// requirements of the `ExecutionPlan` from its input.
///
Expand All @@ -123,7 +123,11 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
self.properties().schema().clone()
}

/// Gets plan properties, such as output ordering(s), partitioning information etc.
/// Return properties of the output of the `ExecutionPlan`, such as output
/// ordering(s), partitioning information etc.
///
/// This information is available via methods on [`ExecutionPlanProperties`]
/// trait, which is implemented for all `ExecutionPlan`s.
fn properties(&self) -> &PlanProperties;

/// Specifies the data distribution requirements for all the
Expand Down Expand Up @@ -401,31 +405,17 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
}
}

/// This extension trait provides an API to fetch various properties of
/// [`ExecutionPlan`] objects.
/// Extension trait provides an easy API to fetch various properties of
/// [`ExecutionPlan`] objects based on [`ExecutionPlan::properties`].
pub trait ExecutionPlanProperties {
fn output_partitioning(&self) -> &Partitioning;

fn execution_mode(&self) -> ExecutionMode;

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>;

fn equivalence_properties(&self) -> &EquivalenceProperties;
}

impl ExecutionPlanProperties for Arc<dyn ExecutionPlan> {
/// Specifies how the output of this `ExecutionPlan` is split into
/// partitions.
fn output_partitioning(&self) -> &Partitioning {
self.properties().output_partitioning()
}
fn output_partitioning(&self) -> &Partitioning;

/// Specifies whether this plan generates an infinite stream of records.
/// If the plan does not support pipelining, but its input(s) are
/// infinite, returns [`ExecutionMode::PipelineBreaking`] to indicate this.
fn execution_mode(&self) -> ExecutionMode {
self.properties().execution_mode()
}
fn execution_mode(&self) -> ExecutionMode;

/// If the output of this `ExecutionPlan` within each partition is sorted,
/// returns `Some(keys)` describing the ordering. A `None` return value
Expand All @@ -434,9 +424,7 @@ impl ExecutionPlanProperties for Arc<dyn ExecutionPlan> {
/// For example, `SortExec` (obviously) produces sorted output as does
/// `SortPreservingMergeStream`. Less obviously, `Projection` produces sorted
/// output if its input is sorted as it does not reorder the input rows.
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.properties().output_ordering()
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>;

/// Get the [`EquivalenceProperties`] within the plan.
///
Expand All @@ -455,6 +443,40 @@ impl ExecutionPlanProperties for Arc<dyn ExecutionPlan> {
///
/// See also [`ExecutionPlan::maintains_input_order`] and [`Self::output_ordering`]
/// for related concepts.
fn equivalence_properties(&self) -> &EquivalenceProperties;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

github rendered the diff strangely -- what I did was move the comments into the trait

}

impl ExecutionPlanProperties for Arc<dyn ExecutionPlan> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
impl ExecutionPlanProperties for Arc<dyn ExecutionPlan> {
impl ExecutionPlanProperties for dyn ExecutionPlan {

Maybe doing this will make the extension trait a bit more generally applicable?

Copy link
Contributor Author

@alamb alamb Feb 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in 839d783

It turns out I had to also leave the

impl ExecutionPlanProperties for Arc<dyn ExecutionPlan> {

Otherwise the compiler didn't seem to be able to find the relevant methods without code changes (e.g. execution_plan.as_ref()....)

fn output_partitioning(&self) -> &Partitioning {
self.properties().output_partitioning()
}

fn execution_mode(&self) -> ExecutionMode {
self.properties().execution_mode()
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.properties().output_ordering()
}

fn equivalence_properties(&self) -> &EquivalenceProperties {
self.properties().equivalence_properties()
}
}

impl ExecutionPlanProperties for &dyn ExecutionPlan {
fn output_partitioning(&self) -> &Partitioning {
self.properties().output_partitioning()
}

fn execution_mode(&self) -> ExecutionMode {
self.properties().execution_mode()
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.properties().output_ordering()
}

fn equivalence_properties(&self) -> &EquivalenceProperties {
self.properties().equivalence_properties()
}
Expand Down Expand Up @@ -519,20 +541,21 @@ fn execution_mode_from_children<'a>(
result
}

/// Stores the plan properties used in query optimization.
/// Stores certain, often expensive to compute, plan properties used in query
/// optimization.
///
/// These properties are in a single structure to permit this information to be computed
/// once and then those cached results used multiple times without recomputation (aka a cache)
/// These properties are stored a single structure to permit this information to
/// be computed once and then those cached results used multiple times without
/// recomputation (aka a cache)
#[derive(Debug, Clone)]
pub struct PlanProperties {
/// Stores the [`EquivalenceProperties`] of the [`ExecutionPlan`].
/// See [ExecutionPlanProperties::equivalence_properties]
pub eq_properties: EquivalenceProperties,
/// Stores the output [`Partitioning`] of the [`ExecutionPlan`].
/// See [ExecutionPlanProperties::output_partitioning]
pub partitioning: Partitioning,
/// Stores the [`ExecutionMode`] of the [`ExecutionPlan`].
pub exec_mode: ExecutionMode,
/// Stores output ordering of the [`ExecutionPlan`]. A `None` value represents
/// no ordering.
/// See [ExecutionPlanProperties::execution_mode]
pub execution_mode: ExecutionMode,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A drive by change was to rename this field to match the trait

/// See [ExecutionPlanProperties::output_ordering]
output_ordering: Option<LexOrdering>,
}

Expand All @@ -541,14 +564,14 @@ impl PlanProperties {
pub fn new(
eq_properties: EquivalenceProperties,
partitioning: Partitioning,
exec_mode: ExecutionMode,
execution_mode: ExecutionMode,
) -> Self {
// Output ordering can be derived from `eq_properties`.
let output_ordering = eq_properties.oeq_class().output_ordering();
Self {
eq_properties,
partitioning,
exec_mode,
execution_mode,
output_ordering,
}
}
Expand All @@ -560,8 +583,8 @@ impl PlanProperties {
}

/// Overwrite the execution Mode with its new value.
pub fn with_exec_mode(mut self, exec_mode: ExecutionMode) -> Self {
self.exec_mode = exec_mode;
pub fn with_execution_mode(mut self, execution_mode: ExecutionMode) -> Self {
self.execution_mode = execution_mode;
self
}

Expand All @@ -587,7 +610,7 @@ impl PlanProperties {
}

pub fn execution_mode(&self) -> ExecutionMode {
self.exec_mode
self.execution_mode
}

/// Get schema of the node.
Expand Down