diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml index fd2b794ed02d..b7125368e72a 100644 --- a/ballista/rust/core/Cargo.toml +++ b/ballista/rust/core/Cargo.toml @@ -32,7 +32,7 @@ simd = ["datafusion/simd"] [dependencies] ahash = { version = "0.7", default-features = false } -arrow-flight = { version = "10.0" } +arrow-flight = { version = "11" } async-trait = "0.1.41" chrono = { version = "0.4", default-features = false } clap = { version = "3", features = ["derive", "cargo"] } diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml index 52168de96591..e75bd3224fbf 100644 --- a/ballista/rust/executor/Cargo.toml +++ b/ballista/rust/executor/Cargo.toml @@ -33,8 +33,8 @@ snmalloc = ["snmalloc-rs"] [dependencies] anyhow = "1" -arrow = { version = "10.0" } -arrow-flight = { version = "10.0" } +arrow = { version = "11" } +arrow-flight = { version = "11" } async-trait = "0.1.41" ballista-core = { path = "../core", version = "0.6.0" } chrono = { version = "0.4", default-features = false } diff --git a/ballista/rust/executor/src/flight_service.rs b/ballista/rust/executor/src/flight_service.rs index cf5ab179813b..31fd8b002485 100644 --- a/ballista/rust/executor/src/flight_service.rs +++ b/ballista/rust/executor/src/flight_service.rs @@ -96,7 +96,8 @@ impl FlightService for BallistaFlightService { )) }) .map_err(|e| from_ballista_err(&e))?; - let reader = FileReader::try_new(file).map_err(|e| from_arrow_err(&e))?; + let reader = + FileReader::try_new(file, None).map_err(|e| from_arrow_err(&e))?; let (tx, rx): (FlightDataSender, FlightDataReceiver) = channel(2); diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index c8269088e289..c636f095f94b 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -28,7 +28,7 @@ repository = "https://github.com/apache/arrow-datafusion" rust-version = "1.59" [dependencies] -arrow = { version = "10.0" } +arrow = { version = "11" } ballista = { path = "../ballista/rust/client", version = "0.6.0", optional = true } clap = { version = "3", features = ["derive", "cargo"] } datafusion = { path = "../datafusion", version = "7.0.0" } diff --git a/datafusion-common/Cargo.toml b/datafusion-common/Cargo.toml index a69d5afd394c..04540ea24569 100644 --- a/datafusion-common/Cargo.toml +++ b/datafusion-common/Cargo.toml @@ -38,10 +38,10 @@ jit = ["cranelift-module"] pyarrow = ["pyo3"] [dependencies] -arrow = { version = "10.0", features = ["prettyprint"] } +arrow = { version = "11", features = ["prettyprint"] } avro-rs = { version = "0.13", features = ["snappy"], optional = true } cranelift-module = { version = "0.82.0", optional = true } ordered-float = "2.10" -parquet = { version = "10.0", features = ["arrow"], optional = true } +parquet = { version = "11", features = ["arrow"], optional = true } pyo3 = { version = "0.16", optional = true } sqlparser = "0.15" diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index 0ad984363be1..d0bc5ec52dfb 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -34,7 +34,7 @@ path = "examples/avro_sql.rs" required-features = ["datafusion/avro"] [dev-dependencies] -arrow-flight = { version = "10.0" } +arrow-flight = { version = "11" } async-trait = "0.1.41" datafusion = { path = "../datafusion" } futures = "0.3" diff --git a/datafusion-expr/Cargo.toml b/datafusion-expr/Cargo.toml index 834d59b708ff..de0d227132b8 100644 --- a/datafusion-expr/Cargo.toml +++ b/datafusion-expr/Cargo.toml @@ -36,6 +36,6 @@ path = "src/lib.rs" [dependencies] ahash = { version = "0.7", default-features = false } -arrow = { version = "10.0", features = ["prettyprint"] } +arrow = { version = "11", features = ["prettyprint"] } datafusion-common = { path = "../datafusion-common", version = "7.0.0" } sqlparser = "0.15" diff --git a/datafusion-physical-expr/Cargo.toml b/datafusion-physical-expr/Cargo.toml index 692301b3309f..485af3757d36 100644 --- a/datafusion-physical-expr/Cargo.toml +++ b/datafusion-physical-expr/Cargo.toml @@ -40,7 +40,7 @@ unicode_expressions = ["unicode-segmentation"] [dependencies] ahash = { version = "0.7", default-features = false } -arrow = { version = "10.0", features = ["prettyprint"] } +arrow = { version = "11", features = ["prettyprint"] } blake2 = { version = "^0.10.2", optional = true } blake3 = { version = "1.0", optional = true } chrono = { version = "0.4", default-features = false } diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index bdfc81e8b032..2bab2b302844 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -55,7 +55,7 @@ unicode_expressions = ["datafusion-physical-expr/regex_expressions"] [dependencies] ahash = { version = "0.7", default-features = false } -arrow = { version = "10.0", features = ["prettyprint"] } +arrow = { version = "11", features = ["prettyprint"] } async-trait = "0.1.41" avro-rs = { version = "0.13", features = ["snappy"], optional = true } chrono = { version = "0.4", default-features = false } @@ -71,7 +71,7 @@ num-traits = { version = "0.2", optional = true } num_cpus = "1.13.0" ordered-float = "2.10" parking_lot = "0.12" -parquet = { version = "10.0", features = ["arrow"] } +parquet = { version = "11", features = ["arrow"] } paste = "^1.0" pin-project-lite= "^0.2.7" pyo3 = { version = "0.16", optional = true } diff --git a/datafusion/fuzz-utils/Cargo.toml b/datafusion/fuzz-utils/Cargo.toml index 71c944180b56..20745867a0c8 100644 --- a/datafusion/fuzz-utils/Cargo.toml +++ b/datafusion/fuzz-utils/Cargo.toml @@ -23,6 +23,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -arrow = { version = "10.0", features = ["prettyprint"] } +arrow = { version = "11", features = ["prettyprint"] } env_logger = "0.9.0" rand = "0.8" diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index c20d0f563b14..c8acd34c7a15 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -53,9 +53,8 @@ use arrow::{ use log::{debug, warn}; use parquet::arrow::ArrowWriter; use parquet::file::{ - metadata::RowGroupMetaData, - reader::{FileReader, SerializedFileReader}, - statistics::Statistics as ParquetStatistics, + metadata::RowGroupMetaData, reader::SerializedFileReader, + serialized_reader::ReadOptionsBuilder, statistics::Statistics as ParquetStatistics, }; use fmt::Debug; @@ -309,7 +308,7 @@ fn send_result( /// Wraps parquet statistics in a way /// that implements [`PruningStatistics`] struct RowGroupPruningStatistics<'a> { - row_group_metadata: &'a [RowGroupMetaData], + row_group_metadata: &'a RowGroupMetaData, parquet_schema: &'a Schema, } @@ -342,33 +341,26 @@ macro_rules! get_statistic { // Extract the min or max value calling `func` or `bytes_func` on the ParquetStatistics as appropriate macro_rules! get_min_max_values { ($self:expr, $column:expr, $func:ident, $bytes_func:ident) => {{ - let (column_index, field) = if let Some((v, f)) = $self.parquet_schema.column_with_name(&$column.name) { - (v, f) - } else { - // Named column was not present - return None - }; + let (column_index, field) = + if let Some((v, f)) = $self.parquet_schema.column_with_name(&$column.name) { + (v, f) + } else { + // Named column was not present + return None; + }; let data_type = field.data_type(); // The result may be None, because DataFusion doesn't have support for ScalarValues of the column type let null_scalar: ScalarValue = data_type.try_into().ok()?; - let scalar_values : Vec = $self.row_group_metadata - .iter() - .flat_map(|meta| { - meta.column(column_index).statistics() - }) - .map(|stats| { - get_statistic!(stats, $func, $bytes_func) - }) - .map(|maybe_scalar| { - // column either did't have statistics at all or didn't have min/max values - maybe_scalar.unwrap_or_else(|| null_scalar.clone()) - }) - .collect(); - - // ignore errors converting to arrays (e.g. different types) - ScalarValue::iter_to_array(scalar_values).ok() + $self.row_group_metadata + .column(column_index) + .statistics() + .map(|stats| get_statistic!(stats, $func, $bytes_func)) + .flatten() + // column either didn't have statistics at all or didn't have min/max values + .or_else(|| Some(null_scalar.clone())) + .map(|s| s.to_array()) }} } @@ -383,17 +375,14 @@ macro_rules! get_null_count_values { return None; }; - let scalar_values: Vec = $self - .row_group_metadata - .iter() - .flat_map(|meta| meta.column(column_index).statistics()) - .map(|stats| { - ScalarValue::UInt64(Some(stats.null_count().try_into().unwrap())) - }) - .collect(); - - // ignore errors converting to arrays (e.g. different types) - ScalarValue::iter_to_array(scalar_values).ok() + let value = ScalarValue::UInt64( + $self + .row_group_metadata + .column(column_index) + .statistics() + .map(|s| s.null_count()), + ); + Some(value.to_array()) }}; } @@ -407,7 +396,7 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { } fn num_containers(&self) -> usize { - self.row_group_metadata.len() + 1 } fn null_counts(&self, column: &Column) -> Option { @@ -418,31 +407,33 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { fn build_row_group_predicate( pruning_predicate: &PruningPredicate, metrics: ParquetFileMetrics, - row_group_metadata: &[RowGroupMetaData], -) -> Box bool> { - let parquet_schema = pruning_predicate.schema().as_ref(); - - let pruning_stats = RowGroupPruningStatistics { - row_group_metadata, - parquet_schema, - }; - let predicate_values = pruning_predicate.prune(&pruning_stats); - - match predicate_values { - Ok(values) => { - // NB: false means don't scan row group - let num_pruned = values.iter().filter(|&v| !*v).count(); - metrics.row_groups_pruned.add(num_pruned); - Box::new(move |_, i| values[i]) - } - // stats filter array could not be built - // return a closure which will not filter out any row groups - Err(e) => { - debug!("Error evaluating row group predicate values {}", e); - metrics.predicate_evaluation_errors.add(1); - Box::new(|_r, _i| true) - } - } +) -> Box bool> { + let pruning_predicate = pruning_predicate.clone(); + Box::new( + move |row_group_metadata: &RowGroupMetaData, _i: usize| -> bool { + let parquet_schema = pruning_predicate.schema().as_ref(); + let pruning_stats = RowGroupPruningStatistics { + row_group_metadata, + parquet_schema, + }; + let predicate_values = pruning_predicate.prune(&pruning_stats); + match predicate_values { + Ok(values) => { + // NB: false means don't scan row group + let num_pruned = values.iter().filter(|&v| !*v).count(); + metrics.row_groups_pruned.add(num_pruned); + values[0] + } + // stats filter array could not be built + // return a closure which will not filter out any row groups + Err(e) => { + debug!("Error evaluating row group predicate values {}", e); + metrics.predicate_evaluation_errors.add(1); + true + } + } + }, + ) } #[allow(clippy::too_many_arguments)] @@ -470,17 +461,20 @@ fn read_partition( ); let object_reader = object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?; - let mut file_reader = - SerializedFileReader::new(ChunkObjectReader(object_reader))?; + + let mut opt = ReadOptionsBuilder::new(); if let Some(pruning_predicate) = pruning_predicate { - let row_group_predicate = build_row_group_predicate( + opt = opt.with_predicate(build_row_group_predicate( pruning_predicate, file_metrics, - file_reader.metadata().row_groups(), - ); - file_reader.filter_row_groups(&row_group_predicate); + )); } + let file_reader = SerializedFileReader::new_with_options( + ChunkObjectReader(object_reader), + opt.build(), + )?; + let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); let adapted_projections = schema_adapter.map_projections(&arrow_reader.get_schema()?, projection)?; @@ -1054,11 +1048,8 @@ mod tests { vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)], ); let row_group_metadata = vec![rgm1, rgm2]; - let row_group_predicate = build_row_group_predicate( - &pruning_predicate, - parquet_file_metrics(), - &row_group_metadata, - ); + let mut row_group_predicate = + build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); let row_group_filter = row_group_metadata .iter() .enumerate() @@ -1087,11 +1078,8 @@ mod tests { vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)], ); let row_group_metadata = vec![rgm1, rgm2]; - let row_group_predicate = build_row_group_predicate( - &pruning_predicate, - parquet_file_metrics(), - &row_group_metadata, - ); + let mut row_group_predicate = + build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); let row_group_filter = row_group_metadata .iter() .enumerate() @@ -1135,11 +1123,8 @@ mod tests { ], ); let row_group_metadata = vec![rgm1, rgm2]; - let row_group_predicate = build_row_group_predicate( - &pruning_predicate, - parquet_file_metrics(), - &row_group_metadata, - ); + let mut row_group_predicate = + build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); let row_group_filter = row_group_metadata .iter() .enumerate() @@ -1153,11 +1138,8 @@ mod tests { // this bypasses the entire predicate expression and no row groups are filtered out let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2))); let pruning_predicate = PruningPredicate::try_new(expr, schema)?; - let row_group_predicate = build_row_group_predicate( - &pruning_predicate, - parquet_file_metrics(), - &row_group_metadata, - ); + let mut row_group_predicate = + build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); let row_group_filter = row_group_metadata .iter() .enumerate() @@ -1202,11 +1184,8 @@ mod tests { let pruning_predicate = PruningPredicate::try_new(expr, schema)?; let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate(); - let row_group_predicate = build_row_group_predicate( - &pruning_predicate, - parquet_file_metrics(), - &row_group_metadata, - ); + let mut row_group_predicate = + build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); let row_group_filter = row_group_metadata .iter() .enumerate() @@ -1234,11 +1213,8 @@ mod tests { let pruning_predicate = PruningPredicate::try_new(expr, schema)?; let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate(); - let row_group_predicate = build_row_group_predicate( - &pruning_predicate, - parquet_file_metrics(), - &row_group_metadata, - ); + let mut row_group_predicate = + build_row_group_predicate(&pruning_predicate, parquet_file_metrics()); let row_group_filter = row_group_metadata .iter() .enumerate() diff --git a/datafusion/src/physical_plan/sorts/sort.rs b/datafusion/src/physical_plan/sorts/sort.rs index b2bf604665a0..396184fdd510 100644 --- a/datafusion/src/physical_plan/sorts/sort.rs +++ b/datafusion/src/physical_plan/sorts/sort.rs @@ -358,7 +358,7 @@ fn write_sorted( fn read_spill(sender: Sender>, path: &Path) -> Result<()> { let file = BufReader::new(File::open(&path)?); - let reader = FileReader::try_new(file)?; + let reader = FileReader::try_new(file, None)?; for batch in reader { sender .blocking_send(batch) diff --git a/datafusion/tests/parquet_pruning.rs b/datafusion/tests/parquet_pruning.rs index c5428e499a68..83d51e071aaa 100644 --- a/datafusion/tests/parquet_pruning.rs +++ b/datafusion/tests/parquet_pruning.rs @@ -262,7 +262,7 @@ async fn prune_int32_scalar_fun() { println!("{}", output.description()); // This should prune out groups with error, because there is not col to // prune the row groups. - assert_eq!(output.predicate_evaluation_errors(), Some(1)); + assert_eq!(output.predicate_evaluation_errors(), Some(4)); assert_eq!(output.row_groups_pruned(), Some(0)); assert_eq!(output.result_rows, 3, "{}", output.description()); } @@ -278,7 +278,7 @@ async fn prune_int32_complex_expr() { println!("{}", output.description()); // This should prune out groups with error, because there is not col to // prune the row groups. - assert_eq!(output.predicate_evaluation_errors(), Some(1)); + assert_eq!(output.predicate_evaluation_errors(), Some(4)); assert_eq!(output.row_groups_pruned(), Some(0)); assert_eq!(output.result_rows, 2, "{}", output.description()); } @@ -294,7 +294,7 @@ async fn prune_int32_complex_expr_subtract() { println!("{}", output.description()); // This should prune out groups with error, because there is not col to // prune the row groups. - assert_eq!(output.predicate_evaluation_errors(), Some(1)); + assert_eq!(output.predicate_evaluation_errors(), Some(4)); assert_eq!(output.row_groups_pruned(), Some(0)); assert_eq!(output.result_rows, 9, "{}", output.description()); } @@ -366,7 +366,7 @@ async fn prune_f64_scalar_fun() { println!("{}", output.description()); // This should prune out groups with error, because there is not col to // prune the row groups. - assert_eq!(output.predicate_evaluation_errors(), Some(1)); + assert_eq!(output.predicate_evaluation_errors(), Some(4)); assert_eq!(output.row_groups_pruned(), Some(0)); assert_eq!(output.result_rows, 1, "{}", output.description()); } @@ -382,7 +382,7 @@ async fn prune_f64_complex_expr() { println!("{}", output.description()); // This should prune out groups with error, because there is not col to // prune the row groups. - assert_eq!(output.predicate_evaluation_errors(), Some(1)); + assert_eq!(output.predicate_evaluation_errors(), Some(4)); assert_eq!(output.row_groups_pruned(), Some(0)); assert_eq!(output.result_rows, 9, "{}", output.description()); } @@ -398,7 +398,7 @@ async fn prune_f64_complex_expr_subtract() { println!("{}", output.description()); // This should prune out groups with error, because there is not col to // prune the row groups. - assert_eq!(output.predicate_evaluation_errors(), Some(1)); + assert_eq!(output.predicate_evaluation_errors(), Some(4)); assert_eq!(output.row_groups_pruned(), Some(0)); assert_eq!(output.result_rows, 9, "{}", output.description()); }