diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index b1482a9699d56..2840d3f62bf93 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -154,24 +154,19 @@ async fn load_left_input( let stream = merge.execute(0, context)?; // Load all batches and count the rows - let (batches, _num_rows, _, reservation) = stream - .try_fold( - (Vec::new(), 0usize, metrics, reservation), - |mut acc, batch| async { - let batch_size = batch.get_array_memory_size(); - // Reserve memory for incoming batch - acc.3.try_grow(batch_size)?; - // Update metrics - acc.2.build_mem_used.add(batch_size); - acc.2.build_input_batches.add(1); - acc.2.build_input_rows.add(batch.num_rows()); - // Update rowcount - acc.1 += batch.num_rows(); - // Push batch to output - acc.0.push(batch); - Ok(acc) - }, - ) + let (batches, _metrics, reservation) = stream + .try_fold((Vec::new(), metrics, reservation), |mut acc, batch| async { + let batch_size = batch.get_array_memory_size(); + // Reserve memory for incoming batch + acc.2.try_grow(batch_size)?; + // Update metrics + acc.1.build_mem_used.add(batch_size); + acc.1.build_input_batches.add(1); + acc.1.build_input_rows.add(batch.num_rows()); + // Push batch to output + acc.0.push(batch); + Ok(acc) + }) .await?; let merged_batch = concat_batches(&left_schema, &batches)?; diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index eac135bfd0fe3..9f1465c2d7c1a 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -364,19 +364,17 @@ async fn collect_left_input( let stream = merge.execute(0, context)?; // Load all batches and count the rows - let (batches, _num_rows, metrics, mut reservation) = stream + let (batches, metrics, mut reservation) = stream .try_fold( - (Vec::new(), 0usize, join_metrics, reservation), + (Vec::new(), join_metrics, reservation), |mut acc, batch| async { let batch_size = batch.get_array_memory_size(); // Reserve memory for incoming batch - acc.3.try_grow(batch_size)?; + acc.2.try_grow(batch_size)?; // Update metrics - acc.2.build_mem_used.add(batch_size); - acc.2.build_input_batches.add(1); - acc.2.build_input_rows.add(batch.num_rows()); - // Update rowcount - acc.1 += batch.num_rows(); + acc.1.build_mem_used.add(batch_size); + acc.1.build_input_batches.add(1); + acc.1.build_input_rows.add(batch.num_rows()); // Push batch to output acc.0.push(batch); Ok(acc)