File tree Expand file tree Collapse file tree 1 file changed +30
-3
lines changed
Expand file tree Collapse file tree 1 file changed +30
-3
lines changed Original file line number Diff line number Diff line change @@ -321,18 +321,45 @@ impl DataStream {
321321 self . batch_size
322322 }
323323
324- fn next_batch ( & mut self ) -> & RecordBatch {
324+ /// Return the next batch.
325+ ///
326+ /// Note we don't inline this so it is easier to profile where the time is being
327+ /// spent in the benchmark
328+ #[ inline( never) ]
329+ fn next_batch ( & mut self ) -> RecordBatch {
325330 let current_index = self . index ;
326331 self . index += 1 ;
327332 if self . index >= self . batches . len ( ) {
328333 self . index = 0 ; // loop back to the start
329334 }
330- self . batches
335+ let batch = self
336+ . batches
331337 . get ( current_index)
332- . expect ( "No more batches available" )
338+ . expect ( "No more batches available" ) ;
339+ deep_copy_batch ( batch)
333340 }
334341}
335342
343+ /// Copies the underlying buffers of a RecordBatch so that the returned
344+ /// batch does not share any buffers with the input batch.
345+ ///
346+ /// This models a realistic scenario where the input batch is created from
347+ /// a data source and then is filtered / not reused.
348+ fn deep_copy_batch ( batch : & RecordBatch ) -> RecordBatch {
349+ let columns = batch
350+ . columns ( )
351+ . iter ( )
352+ . map ( |array| {
353+ let src_data = array. into_data ( ) ;
354+ let mut copy = MutableArrayData :: new ( vec ! [ & src_data] , true , src_data. len ( ) ) ;
355+ copy. extend ( 0 , 0 , src_data. len ( ) ) ;
356+ let new_data = copy. freeze ( ) ;
357+ make_array ( new_data)
358+ } )
359+ . collect :: < Vec < _ > > ( ) ;
360+ RecordBatch :: try_new ( batch. schema ( ) , columns) . unwrap ( )
361+ }
362+
336363#[ derive( Debug , Clone ) ]
337364struct DataStreamBuilder {
338365 schema : SchemaRef ,
You can’t perform that action at this time.
0 commit comments