diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index 7d75f7f1cd..b2546f839d 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -65,6 +65,8 @@ pub struct ScanExec { pub input_source_description: String, /// The data types of columns of the input batch. Converted from Spark schema. pub data_types: Vec, + /// Schema of first batch + pub schema: SchemaRef, /// The input batch of input data. Used to determine the schema of the input data. /// It is also used in unit test to mock the input data from JVM. pub batch: Arc>>, @@ -72,6 +74,7 @@ pub struct ScanExec { cache: PlanProperties, /// Metrics collector metrics: ExecutionPlanMetricsSet, + baseline_metrics: BaselineMetrics, } impl ScanExec { @@ -81,6 +84,9 @@ impl ScanExec { input_source_description: &str, data_types: Vec, ) -> Result { + let metrics_set = ExecutionPlanMetricsSet::default(); + let baseline_metrics = BaselineMetrics::new(&metrics_set, 0); + // Scan's schema is determined by the input batch, so we need to set it before execution. // Note that we determine if arrays are dictionary-encoded based on the // first batch. The array may be dictionary-encoded in some batches and not others, and @@ -88,7 +94,12 @@ impl ScanExec { // may end up either unpacking dictionary arrays or dictionary-encoding arrays. // Dictionary-encoded primitive arrays are always unpacked. let first_batch = if let Some(input_source) = input_source.as_ref() { - ScanExec::get_next(exec_context_id, input_source.as_obj(), data_types.len())? + let mut timer = baseline_metrics.elapsed_compute().timer(); + let batch = + ScanExec::get_next(exec_context_id, input_source.as_obj(), data_types.len())?; + timer.stop(); + baseline_metrics.record_output(batch.num_rows()); + batch } else { InputBatch::EOF }; @@ -96,7 +107,7 @@ impl ScanExec { let schema = scan_schema(&first_batch, &data_types); let cache = PlanProperties::new( - EquivalenceProperties::new(schema), + EquivalenceProperties::new(Arc::clone(&schema)), // The partitioning is not important because we are not using DataFusion's // query planner or optimizer Partitioning::UnknownPartitioning(1), @@ -110,7 +121,9 @@ impl ScanExec { data_types, batch: Arc::new(Mutex::new(Some(first_batch))), cache, - metrics: ExecutionPlanMetricsSet::default(), + metrics: metrics_set, + baseline_metrics, + schema, }) } @@ -276,11 +289,15 @@ impl ExecutionPlan for ScanExec { } fn schema(&self) -> SchemaRef { - // `unwrap` is safe because `schema` is only called during converting - // Spark plan to DataFusion plan. At the moment, `batch` is not EOF. - let binding = self.batch.try_lock().unwrap(); - let input_batch = binding.as_ref().unwrap(); - scan_schema(input_batch, &self.data_types) + if self.exec_context_id == TEST_EXEC_CONTEXT_ID { + // `unwrap` is safe because `schema` is only called during converting + // Spark plan to DataFusion plan. At the moment, `batch` is not EOF. + let binding = self.batch.try_lock().unwrap(); + let input_batch = binding.as_ref().unwrap(); + scan_schema(input_batch, &self.data_types) + } else { + Arc::clone(&self.schema) + } } fn children(&self) -> Vec<&Arc> { @@ -303,6 +320,7 @@ impl ExecutionPlan for ScanExec { self.clone(), self.schema(), partition, + self.baseline_metrics.clone(), ))) } @@ -352,8 +370,12 @@ struct ScanStream<'a> { } impl<'a> ScanStream<'a> { - pub fn new(scan: ScanExec, schema: SchemaRef, partition: usize) -> Self { - let baseline_metrics = BaselineMetrics::new(&scan.metrics, partition); + pub fn new( + scan: ScanExec, + schema: SchemaRef, + partition: usize, + baseline_metrics: BaselineMetrics, + ) -> Self { let cast_time = MetricBuilder::new(&scan.metrics).subset_time("cast_time", partition); Self { scan, @@ -465,4 +487,12 @@ impl InputBatch { InputBatch::Batch(columns, num_rows) } + + /// Get the number of rows in this batch + fn num_rows(&self) -> usize { + match self { + Self::EOF => 0, + Self::Batch(_, num_rows) => *num_rows, + } + } }