@@ -41,8 +41,6 @@ use crate::coalesce_partitions::CoalescePartitionsExec;
4141use crate :: display:: DisplayableExecutionPlan ;
4242use crate :: metrics:: MetricsSet ;
4343use crate :: projection:: ProjectionExec ;
44- use crate :: repartition:: RepartitionExec ;
45- use crate :: sorts:: sort_preserving_merge:: SortPreservingMergeExec ;
4644use crate :: stream:: RecordBatchStreamAdapter ;
4745
4846use arrow:: array:: { Array , RecordBatch } ;
@@ -559,16 +557,6 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
559557 child_pushdown_result,
560558 ) )
561559 }
562-
563- /// Returns a version of this plan that cooperates with the runtime via
564- /// built‐in yielding. If such a version doesn't exist, returns `None`.
565- /// You do not need to do provide such a version of a custom operator,
566- /// but DataFusion will utilize it while optimizing the plan if it exists.
567- fn with_cooperative_yields ( self : Arc < Self > ) -> Option < Arc < dyn ExecutionPlan > > {
568- // Conservative default implementation assumes that a leaf does not
569- // cooperate with yielding.
570- None
571- }
572560}
573561
574562/// [`ExecutionPlan`] Invariant Level
@@ -743,6 +731,26 @@ pub enum EmissionType {
743731 Both ,
744732}
745733
734+ #[ derive( Debug , Clone , Copy , PartialEq , Eq ) ]
735+ pub enum SchedulingType {
736+ /// The stream generated by [`execute`](ExecutionPlan::execute) does not participate in cooperative scheduling
737+ Blocking ,
738+ /// The stream generated by [`execute`](ExecutionPlan::execute) actively participates in cooperative scheduling
739+ /// by consuming task budget
740+ Cooperative ,
741+ }
742+
743+ #[ derive( Debug , Clone , Copy , PartialEq , Eq ) ]
744+ pub enum EvaluationType {
745+ /// The stream generated by [`execute`](ExecutionPlan::execute) only generates `RecordBatch`
746+ /// instances when it is demanded by invoking `Stream::poll_next`.
747+ Lazy ,
748+ /// The stream generated by [`execute`](ExecutionPlan::execute) eagerly generates `RecordBatch`
749+ /// in one or more spawned Tokio tasks. Eager evaluation is only started the first time
750+ /// `Stream::poll_next` is called.
751+ Eager ,
752+ }
753+
746754/// Utility to determine an operator's boundedness based on its children's boundedness.
747755///
748756/// Assumes boundedness can be inferred from child operators:
@@ -831,6 +839,8 @@ pub struct PlanProperties {
831839 pub emission_type : EmissionType ,
832840 /// See [ExecutionPlanProperties::boundedness]
833841 pub boundedness : Boundedness ,
842+ pub evaluation_type : EvaluationType ,
843+ pub scheduling_type : SchedulingType ,
834844 /// See [ExecutionPlanProperties::output_ordering]
835845 output_ordering : Option < LexOrdering > ,
836846}
@@ -850,6 +860,8 @@ impl PlanProperties {
850860 partitioning,
851861 emission_type,
852862 boundedness,
863+ evaluation_type : EvaluationType :: Lazy ,
864+ scheduling_type : SchedulingType :: Blocking ,
853865 output_ordering,
854866 }
855867 }
@@ -881,6 +893,16 @@ impl PlanProperties {
881893 self
882894 }
883895
896+ pub fn with_scheduling_type ( mut self , scheduling_type : SchedulingType ) -> Self {
897+ self . scheduling_type = scheduling_type;
898+ self
899+ }
900+
901+ pub fn with_evaluation_type ( mut self , drive_type : EvaluationType ) -> Self {
902+ self . evaluation_type = drive_type;
903+ self
904+ }
905+
884906 /// Overwrite constraints with its new value.
885907 pub fn with_constraints ( mut self , constraints : Constraints ) -> Self {
886908 self . eq_properties = self . eq_properties . with_constraints ( constraints) ;
@@ -912,25 +934,7 @@ impl PlanProperties {
912934/// 2. CoalescePartitionsExec for collapsing all of the partitions into one without ordering guarantee
913935/// 3. SortPreservingMergeExec for collapsing all of the sorted partitions into one with ordering guarantee
914936pub fn need_data_exchange ( plan : Arc < dyn ExecutionPlan > ) -> bool {
915- if let Some ( repartition) = plan. as_any ( ) . downcast_ref :: < RepartitionExec > ( ) {
916- !matches ! (
917- repartition. properties( ) . output_partitioning( ) ,
918- Partitioning :: RoundRobinBatch ( _)
919- )
920- } else if let Some ( coalesce) = plan. as_any ( ) . downcast_ref :: < CoalescePartitionsExec > ( )
921- {
922- coalesce. input ( ) . output_partitioning ( ) . partition_count ( ) > 1
923- } else if let Some ( sort_preserving_merge) =
924- plan. as_any ( ) . downcast_ref :: < SortPreservingMergeExec > ( )
925- {
926- sort_preserving_merge
927- . input ( )
928- . output_partitioning ( )
929- . partition_count ( )
930- > 1
931- } else {
932- false
933- }
937+ plan. properties ( ) . evaluation_type == EvaluationType :: Lazy
934938}
935939
936940/// Returns a copy of this plan if we change any child according to the pointer comparison.
0 commit comments