@@ -189,25 +189,59 @@ impl<T> FilterPushdownPropagation<T> {
189189 }
190190}
191191
192+ #[ derive( Debug , Clone ) ]
193+ struct ChildFilterDescription {
194+ /// Description of which parent filters can be pushed down into this node.
195+ /// Since we need to transmit filter pushdown results back to this node's parent
196+ /// we need to track each parent filter for each child, even those that are unsupported / won't be pushed down.
197+ /// We do this using a [`PredicateSupports`] which simplifies manipulating supported/unsupported filters.
198+ parent_filters : PredicateSupports ,
199+ /// Description of which filters this node is pushing down to its children.
200+ /// Since this is not transmitted back to the parents we can have variable sized inner arrays
201+ /// instead of having to track supported/unsupported.
202+ self_filters : Vec < Arc < dyn PhysicalExpr > > ,
203+ }
204+
205+ impl ChildFilterDescription {
206+ fn new ( ) -> Self {
207+ Self {
208+ parent_filters : PredicateSupports :: new ( vec ! [ ] ) ,
209+ self_filters : vec ! [ ] ,
210+ }
211+ }
212+ }
213+
192214#[ derive( Debug , Clone ) ]
193215pub struct FilterDescription {
194- num_children : usize ,
195- /// Vector storing the [`PredicateSupports`] for each child.
196- pub parent_filters : Vec < PredicateSupports > ,
197- /// Vector storing the physical expressions for each child.
198- /// Inner vector is for multiple predicates, if the node stores them such.
199- pub self_filters : Vec < Vec < Arc < dyn PhysicalExpr > > > ,
216+ /// A filter description for each child.
217+ /// This includes which parent filters and which self filters (from the node in question)
218+ /// will get pushed down to each child.
219+ child_filter_descriptions : Vec < ChildFilterDescription > ,
200220}
201221
202222impl FilterDescription {
203223 pub fn new_with_child_count ( num_children : usize ) -> Self {
204224 Self {
205- num_children,
206- parent_filters : Vec :: with_capacity ( num_children) ,
207- self_filters : vec ! [ vec![ ] ; num_children] ,
225+ child_filter_descriptions : vec ! [ ChildFilterDescription :: new( ) ; num_children] ,
208226 }
209227 }
210228
229+ pub fn parent_filters ( & self ) -> Vec < PredicateSupports > {
230+ self . child_filter_descriptions
231+ . iter ( )
232+ . map ( |d| & d. parent_filters )
233+ . cloned ( )
234+ . collect ( )
235+ }
236+
237+ pub fn self_filters ( & self ) -> Vec < Vec < Arc < dyn PhysicalExpr > > > {
238+ self . child_filter_descriptions
239+ . iter ( )
240+ . map ( |d| & d. self_filters )
241+ . cloned ( )
242+ . collect ( )
243+ }
244+
211245 /// Mark all parent filters as supported for all children.
212246 /// This is the case if the node allows filters to be pushed down through it
213247 /// without any modification.
@@ -219,15 +253,14 @@ impl FilterDescription {
219253 ///
220254 /// [`RepartitionExec`]: crate::repartition::RepartitionExec
221255 pub fn all_parent_filters_supported (
222- self ,
256+ mut self ,
223257 parent_filters : Vec < Arc < dyn PhysicalExpr > > ,
224258 ) -> Self {
225259 let supported = PredicateSupports :: all_supported ( parent_filters) ;
226- Self {
227- num_children : self . num_children ,
228- parent_filters : vec ! [ supported; self . num_children] ,
229- self_filters : self . self_filters ,
260+ for child in & mut self . child_filter_descriptions {
261+ child. parent_filters = supported. clone ( ) ;
230262 }
263+ self
231264 }
232265
233266 /// Mark all parent filters as unsupported for all children.
@@ -240,15 +273,14 @@ impl FilterDescription {
240273 ///
241274 /// [`ExecutionPlan`]: crate::ExecutionPlan
242275 pub fn all_parent_filters_unsupported (
243- self ,
276+ mut self ,
244277 parent_filters : Vec < Arc < dyn PhysicalExpr > > ,
245278 ) -> Self {
246279 let unsupported = PredicateSupports :: all_unsupported ( parent_filters) ;
247- Self {
248- num_children : self . num_children ,
249- parent_filters : vec ! [ unsupported; self . num_children] ,
250- self_filters : self . self_filters ,
280+ for child in & mut self . child_filter_descriptions {
281+ child. parent_filters = unsupported. clone ( ) ;
251282 }
283+ self
252284 }
253285
254286 /// Add a filter generated / owned by the current node to be pushed down to all children.
@@ -260,7 +292,9 @@ impl FilterDescription {
260292 /// - `TopK` uses this to push down a single filter to all children, it can use this method.
261293 /// - `HashJoinExec` pushes down a filter only to the probe side, it cannot use this method.
262294 pub fn with_self_filter ( mut self , predicate : Arc < dyn PhysicalExpr > ) -> Self {
263- self . self_filters = vec ! [ vec![ predicate] ; self . num_children] ;
295+ for child in & mut self . child_filter_descriptions {
296+ child. self_filters = vec ! [ Arc :: clone( & predicate) ] ;
297+ }
264298 self
265299 }
266300}
0 commit comments