Skip to content

Commit ff3b262

Browse files
committed
fix
1 parent 3ec4d63 commit ff3b262

1 file changed

Lines changed: 6 additions & 1 deletion

File tree

native/core/src/execution/datafusion/shuffle_writer.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ use datafusion::{
5656
};
5757
use datafusion_physical_expr::EquivalenceProperties;
5858
use futures::{lock::Mutex, Stream, StreamExt, TryFutureExt, TryStreamExt};
59+
use futures::executor::block_on;
5960
use itertools::Itertools;
6061
use simd_adler32::Adler32;
6162
use tokio::task;
@@ -1111,7 +1112,11 @@ async fn external_shuffle(
11111112
);
11121113

11131114
while let Some(batch) = input.next().await {
1114-
repartitioner.insert_batch(batch?).await?;
1115+
// Block on the repartitioner to insert the batch and shuffle the rows
1116+
// into the corresponding partition buffer.
1117+
// Otherwise, pull the next batch from the input stream might overwrite the
1118+
// current batch in the repartitioner.
1119+
block_on(repartitioner.insert_batch(batch?))?;
11151120
}
11161121
repartitioner.shuffle_write().await
11171122
}

0 commit comments

Comments
 (0)