Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,17 @@ config_namespace! {
/// in parallel using the provided `target_partitions` level"
pub repartition_aggregations: bool, default = true

/// Minimum total files size in bytes to perform file scan repartitioning.
pub repartition_file_min_size: usize, default = 10 * 1024 * 1024

/// Should DataFusion repartition data using the join keys to execute joins in parallel
/// using the provided `target_partitions` level"
pub repartition_joins: bool, default = true

/// When set to true, file groups will be repartitioned to achieve maximum parallelism.
/// Currently supported only for Parquet format
pub repartition_file_scans: bool, default = false

/// Should DataFusion repartition data using the partitions keys to execute window
/// functions in parallel using the provided `target_partitions` level"
pub repartition_windows: bool, default = true
Expand Down
12 changes: 12 additions & 0 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1251,6 +1251,18 @@ impl SessionConfig {
self
}

/// Sets minimum file range size for repartitioning scans
pub fn with_repartition_file_min_size(mut self, size: usize) -> Self {
self.options.optimizer.repartition_file_min_size = size;
self
}

/// Enables or disables the use of repartitioning for file scans
pub fn with_repartition_file_scans(mut self, enabled: bool) -> Self {
self.options.optimizer.repartition_file_scans = enabled;
self
}

/// Enables or disables the use of repartitioning for window functions to improve parallelism
pub fn with_repartition_windows(mut self, enabled: bool) -> Self {
self.options.optimizer.repartition_windows = enabled;
Expand Down
245 changes: 236 additions & 9 deletions datafusion/core/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_plan::Partitioning::*;
use crate::physical_plan::{
repartition::RepartitionExec, with_new_children_if_necessary, ExecutionPlan,
file_format::ParquetExec, repartition::RepartitionExec,
with_new_children_if_necessary, ExecutionPlan,
};

/// Optimizer that introduces repartition to introduce more
Expand Down Expand Up @@ -167,6 +168,8 @@ fn optimize_partitions(
is_root: bool,
can_reorder: bool,
would_benefit: bool,
repartition_file_scans: bool,
repartition_file_min_size: usize,
) -> Result<Arc<dyn ExecutionPlan>> {
// Recurse into children bottom-up (attempt to repartition as
// early as possible)
Expand Down Expand Up @@ -199,6 +202,8 @@ fn optimize_partitions(
false, // child is not root
can_reorder_child,
plan.benefits_from_input_partitioning(),
repartition_file_scans,
repartition_file_min_size,
)
})
.collect::<Result<_>>()?;
Expand Down Expand Up @@ -227,14 +232,28 @@ fn optimize_partitions(
could_repartition = false;
}

if would_benefit && could_repartition && can_reorder {
Ok(Arc::new(RepartitionExec::try_new(
new_plan,
RoundRobinBatch(target_partitions),
)?))
} else {
Ok(new_plan)
let repartition_allowed = would_benefit && could_repartition && can_reorder;

// If repartition is not allowed - return plan as it is
if !repartition_allowed {
return Ok(new_plan);
}

// For ParquetExec return internally repartitioned version of the plan in case `repartition_file_scans` is set
if let Some(parquet_exec) = new_plan.as_any().downcast_ref::<ParquetExec>() {
if repartition_file_scans {
return Ok(Arc::new(
parquet_exec
.get_repartitioned(target_partitions, repartition_file_min_size),
));
}
}

// Otherwise - return plan wrapped up in RepartitionExec
Ok(Arc::new(RepartitionExec::try_new(
new_plan,
RoundRobinBatch(target_partitions),
)?))
}

/// Returns true if `plan` requires any of inputs to be sorted in some
Expand All @@ -253,6 +272,8 @@ impl PhysicalOptimizerRule for Repartition {
) -> Result<Arc<dyn ExecutionPlan>> {
let target_partitions = config.execution.target_partitions;
let enabled = config.optimizer.enable_round_robin_repartition;
let repartition_file_scans = config.optimizer.repartition_file_scans;
let repartition_file_min_size = config.optimizer.repartition_file_min_size;
// Don't run optimizer if target_partitions == 1
if !enabled || target_partitions == 1 {
Ok(plan)
Expand All @@ -266,6 +287,8 @@ impl PhysicalOptimizerRule for Repartition {
is_root,
can_reorder,
would_benefit,
repartition_file_scans,
repartition_file_min_size,
)
}
}
Expand Down Expand Up @@ -331,6 +354,28 @@ mod tests {
))
}

/// Create a non sorted parquet exec over two files / partitions
fn parquet_exec_two_partitions() -> Arc<ParquetExec> {
Arc::new(ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
file_schema: schema(),
file_groups: vec![
vec![PartitionedFile::new("x".to_string(), 100)],
vec![PartitionedFile::new("y".to_string(), 200)],
],
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: None,
infinite_source: false,
},
None,
None,
))
}

// Created a sorted parquet exec
fn parquet_exec_sorted() -> Arc<ParquetExec> {
let sort_exprs = vec![PhysicalSortExpr {
Expand Down Expand Up @@ -448,10 +493,16 @@ mod tests {
/// Runs the repartition optimizer and asserts the plan against the expected
macro_rules! assert_optimized {
($EXPECTED_LINES: expr, $PLAN: expr) => {
assert_optimized!($EXPECTED_LINES, $PLAN, 10, false, 1024);
};

($EXPECTED_LINES: expr, $PLAN: expr, $TAGRET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => {
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();

let mut config = ConfigOptions::new();
config.execution.target_partitions = 10;
config.execution.target_partitions = $TAGRET_PARTITIONS;
config.optimizer.repartition_file_scans = $REPARTITION_FILE_SCANS;
config.optimizer.repartition_file_min_size = $REPARTITION_FILE_MIN_SIZE;

// run optimizer
let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
Expand Down Expand Up @@ -846,6 +897,182 @@ mod tests {
Ok(())
}

#[test]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love the test coverage

fn parallelization_single_partition() -> Result<()> {
let plan = aggregate(parquet_exec());

let expected = [
"AggregateExec: mode=Final, gby=[], aggr=[]",
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"ParquetExec: limit=None, partitions={2 groups: [[x:0..50], [x:50..100]]}, projection=[c1]",
];

assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

#[test]
fn parallelization_two_partitions() -> Result<()> {
let plan = aggregate(parquet_exec_two_partitions());

let expected = [
"AggregateExec: mode=Final, gby=[], aggr=[]",
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
// Plan already has two partitions
"ParquetExec: limit=None, partitions={2 groups: [[x], [y]]}, projection=[c1]",
];

assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

#[test]
fn parallelization_sorted_limit() -> Result<()> {
let plan = limit_exec(sort_exec(parquet_exec(), false));

let expected = &[
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
// data is sorted so can't repartition here
"SortExec: [c1@0 ASC]",
// Doesn't parallelize for SortExec without preserve_partitioning
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

#[test]
fn parallelization_limit_with_filter() -> Result<()> {
let plan = limit_exec(filter_exec(sort_exec(parquet_exec(), false)));

let expected = &[
"GlobalLimitExec: skip=0, fetch=100",
"LocalLimitExec: fetch=100",
"FilterExec: c1@0",
// data is sorted so can't repartition here even though
// filter would benefit from parallelism, the answers might be wrong
"SortExec: [c1@0 ASC]",
// SortExec doesn't benefit from input partitioning
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

#[test]
fn parallelization_ignores_limit() -> Result<()> {
let plan = aggregate(limit_exec(filter_exec(limit_exec(parquet_exec()))));

let expected = &[
"AggregateExec: mode=Final, gby=[], aggr=[]",
"CoalescePartitionsExec",
"AggregateExec: mode=Partial, gby=[], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
"GlobalLimitExec: skip=0, fetch=100",
"CoalescePartitionsExec",
"LocalLimitExec: fetch=100",
"FilterExec: c1@0",
// repartition should happen prior to the filter to maximize parallelism
"RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
"GlobalLimitExec: skip=0, fetch=100",
// Limit doesn't benefit from input partitionins - no parallelism
"LocalLimitExec: fetch=100",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

#[test]
fn parallelization_union_inputs() -> Result<()> {
let plan = union_exec(vec![parquet_exec(); 5]);

let expected = &[
"UnionExec",
// Union doesn benefit from input partitioning - no parallelism
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[c1]",
];

assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

#[test]
fn parallelization_prior_to_sort_preserving_merge() -> Result<()> {
// sort preserving merge already sorted input,
let plan = sort_preserving_merge_exec(parquet_exec_sorted());

// parallelization potentially could break sort order
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
];

assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

#[test]
fn parallelization_sort_preserving_merge_with_union() -> Result<()> {
// 2 sorted parquet files unioned (partitions are concatenated, sort is preserved)
let input = union_exec(vec![parquet_exec_sorted(); 2]);
let plan = sort_preserving_merge_exec(input);

// should not repartition / sort (as the data was already sorted)
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"UnionExec",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
];

assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

#[test]
fn parallelization_does_not_destroy_sort() -> Result<()> {
// SortRequired
// Parquet(sorted)

let plan = sort_required_exec(parquet_exec_sorted());

// no parallelization to preserve sort order
let expected = &[
"SortRequiredExec",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
];

assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

#[test]
fn parallelization_ignores_transitively_with_projection() -> Result<()> {
// sorted input
let plan = sort_preserving_merge_exec(projection_exec(parquet_exec_sorted()));

// data should not be repartitioned / resorted
let expected = &[
"SortPreservingMergeExec: [c1@0 ASC]",
"ProjectionExec: expr=[c1@0 as c1]",
"ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[c1@0 ASC], projection=[c1]",
];

assert_optimized!(expected, plan, 2, true, 10);
Ok(())
}

/// Models operators like BoundedWindowExec that require an input
/// ordering but is easy to construct
#[derive(Debug)]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/physical_plan/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ impl<'a> Display for FileGroupsDisplay<'a> {
first_file = false;

write!(f, "{}", pf.object_meta.location.as_ref())?;

if let Some(range) = pf.range.as_ref() {
write!(f, ":{}..{}", range.start, range.end)?;
}
}
write!(f, "]")?;
}
Expand Down
Loading