@@ -21,8 +21,8 @@ use crate::{PhysicalExpr, PhysicalSortExpr};
2121use arrow:: array:: Array ;
2222use arrow:: datatypes:: Schema ;
2323use arrow:: record_batch:: RecordBatch ;
24- use arrow:: row:: { RowConverter , SortField } ;
25- use datafusion_common:: Result ;
24+ use arrow:: row:: { RowConverter , Rows , SortField } ;
25+ use datafusion_common:: { internal_datafusion_err , Result } ;
2626use datafusion_execution:: memory_pool:: MemoryReservation ;
2727use datafusion_physical_expr_common:: sort_expr:: LexOrdering ;
2828use futures:: stream:: { Fuse , StreamExt } ;
@@ -76,8 +76,40 @@ impl FusedStreams {
7676 }
7777}
7878
79+ /// A pair of `Arc<Rows>` that can be reused
80+ #[ derive( Debug ) ]
81+ struct ReusableRows {
82+ // inner[stream_idx] holds a two Arcs:
83+ // at start of a new poll
84+ // .0 is the rows from the previous poll (at start),
85+ // .1 is the one that is being written to
86+ // at end of a poll, .0 will be swapped with .1,
87+ inner : Vec < [ Option < Arc < Rows > > ; 2 ] > ,
88+ }
89+
90+ impl ReusableRows {
91+ // return a Rows for writing,
92+ // does not clone if the existing rows can be reused
93+ fn take_next ( & mut self , stream_idx : usize ) -> Result < Rows > {
94+ Arc :: try_unwrap ( self . inner [ stream_idx] [ 1 ] . take ( ) . unwrap ( ) ) . map_err ( |_| {
95+ internal_datafusion_err ! (
96+ "Rows from RowCursorStream is still in use by consumer"
97+ )
98+ } )
99+ }
100+ // save the Rows
101+ fn save ( & mut self , stream_idx : usize , rows : Arc < Rows > ) {
102+ self . inner [ stream_idx] [ 1 ] = Some ( Arc :: clone ( & rows) ) ;
103+ // swap the curent with the previous one, so that the next poll can reuse the Rows from the previous poll
104+ let [ a, b] = & mut self . inner [ stream_idx] ;
105+ std:: mem:: swap ( a, b) ;
106+ }
107+ }
108+
79109/// A [`PartitionedStream`] that wraps a set of [`SendableRecordBatchStream`]
80110/// and computes [`RowValues`] based on the provided [`PhysicalSortExpr`]
111+ /// Note: the stream returns an error if the consumer buffers more than one RowValues (i.e. holds on to two RowValues
112+ /// from the same partition at the same time).
81113#[ derive( Debug ) ]
82114pub struct RowCursorStream {
83115 /// Converter to convert output of physical expressions
@@ -88,6 +120,9 @@ pub struct RowCursorStream {
88120 streams : FusedStreams ,
89121 /// Tracks the memory used by `converter`
90122 reservation : MemoryReservation ,
123+ /// Allocated rows for each partition, we keep two to allow for buffering one
124+ /// in the consumer of the stream
125+ rows : ReusableRows ,
91126}
92127
93128impl RowCursorStream {
@@ -105,26 +140,48 @@ impl RowCursorStream {
105140 } )
106141 . collect :: < Result < Vec < _ > > > ( ) ?;
107142
108- let streams = streams. into_iter ( ) . map ( |s| s. fuse ( ) ) . collect ( ) ;
143+ let streams: Vec < _ > = streams. into_iter ( ) . map ( |s| s. fuse ( ) ) . collect ( ) ;
109144 let converter = RowConverter :: new ( sort_fields) ?;
145+ let mut rows = Vec :: with_capacity ( streams. len ( ) ) ;
146+ for _ in & streams {
147+ // Initialize each stream with an empty Rows
148+ rows. push ( [
149+ Some ( Arc :: new ( converter. empty_rows ( 0 , 0 ) ) ) ,
150+ Some ( Arc :: new ( converter. empty_rows ( 0 , 0 ) ) ) ,
151+ ] ) ;
152+ }
110153 Ok ( Self {
111154 converter,
112155 reservation,
113156 column_expressions : expressions. iter ( ) . map ( |x| Arc :: clone ( & x. expr ) ) . collect ( ) ,
114157 streams : FusedStreams ( streams) ,
158+ rows : ReusableRows { inner : rows } ,
115159 } )
116160 }
117161
118- fn convert_batch ( & mut self , batch : & RecordBatch ) -> Result < RowValues > {
162+ fn convert_batch (
163+ & mut self ,
164+ batch : & RecordBatch ,
165+ stream_idx : usize ,
166+ ) -> Result < RowValues > {
119167 let cols = self
120168 . column_expressions
121169 . iter ( )
122170 . map ( |expr| expr. evaluate ( batch) ?. into_array ( batch. num_rows ( ) ) )
123171 . collect :: < Result < Vec < _ > > > ( ) ?;
124172
125- let rows = self . converter . convert_columns ( & cols) ?;
173+ // At this point, ownership should of this Rows should be unique
174+ let mut rows = self . rows . take_next ( stream_idx) ?;
175+
176+ rows. clear ( ) ;
177+
178+ self . converter . append ( & mut rows, & cols) ?;
126179 self . reservation . try_resize ( self . converter . size ( ) ) ?;
127180
181+ let rows = Arc :: new ( rows) ;
182+
183+ self . rows . save ( stream_idx, Arc :: clone ( & rows) ) ;
184+
128185 // track the memory in the newly created Rows.
129186 let mut rows_reservation = self . reservation . new_empty ( ) ;
130187 rows_reservation. try_grow ( rows. size ( ) ) ?;
@@ -146,7 +203,7 @@ impl PartitionedStream for RowCursorStream {
146203 ) -> Poll < Option < Self :: Output > > {
147204 Poll :: Ready ( ready ! ( self . streams. poll_next( cx, stream_idx) ) . map ( |r| {
148205 r. and_then ( |batch| {
149- let cursor = self . convert_batch ( & batch) ?;
206+ let cursor = self . convert_batch ( & batch, stream_idx ) ?;
150207 Ok ( ( cursor, batch) )
151208 } )
152209 } ) )
0 commit comments