Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions datafusion/core/tests/physical_optimizer/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,11 @@ fn transforms_coalesce_batches_exec_into_fetching_version_and_removes_local_limi
LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;

let expected = [
"GlobalLimitExec: skip=0, fetch=5",
" CoalescePartitionsExec",
" CoalesceBatchesExec: target_batch_size=8192, fetch=5",
" FilterExec: c3@2 > 0",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
"CoalescePartitionsExec: fetch=5",
" CoalesceBatchesExec: target_batch_size=8192, fetch=5",
" FilterExec: c3@2 > 0",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
];
assert_eq!(get_plan_string(&after_optimize), expected);

Expand Down Expand Up @@ -378,11 +377,10 @@ fn keeps_pushed_local_limit_exec_when_there_are_multiple_input_partitions() -> R
LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;

let expected = [
"GlobalLimitExec: skip=0, fetch=5",
" CoalescePartitionsExec",
" FilterExec: c3@2 > 0",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
"CoalescePartitionsExec: fetch=5",
" FilterExec: c3@2 > 0",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
];
assert_eq!(get_plan_string(&after_optimize), expected);

Expand Down
7 changes: 0 additions & 7 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,6 @@ async fn explain_analyze_baseline_metrics() {
"FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
"metrics=[output_rows=99, elapsed_compute="
);
assert_metrics!(
&formatted,
"GlobalLimitExec: skip=0, fetch=3, ",
"metrics=[output_rows=3, elapsed_compute="
);
assert_metrics!(
&formatted,
"ProjectionExec: expr=[count(*)",
Expand Down Expand Up @@ -101,9 +96,7 @@ async fn explain_analyze_baseline_metrics() {

plan.as_any().downcast_ref::<sorts::sort::SortExec>().is_some()
|| plan.as_any().downcast_ref::<physical_plan::aggregates::AggregateExec>().is_some()
// CoalescePartitionsExec doesn't do any work so is not included
|| plan.as_any().downcast_ref::<physical_plan::filter::FilterExec>().is_some()
|| plan.as_any().downcast_ref::<physical_plan::limit::GlobalLimitExec>().is_some()
|| plan.as_any().downcast_ref::<physical_plan::limit::LocalLimitExec>().is_some()
|| plan.as_any().downcast_ref::<physical_plan::projection::ProjectionExec>().is_some()
|| plan.as_any().downcast_ref::<physical_plan::coalesce_batches::CoalesceBatchesExec>().is_some()
Expand Down
9 changes: 0 additions & 9 deletions datafusion/physical-optimizer/src/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,6 @@ pub fn pushdown_limit_helper(
global_state.skip = skip;
global_state.fetch = fetch;

if limit_exec.input().as_any().is::<CoalescePartitionsExec>() {
// If the child is a `CoalescePartitionsExec`, we should not remove the limit
// the push_down through the `CoalescePartitionsExec` to each partition will not guarantee the limit.
// TODO: we may have a better solution if we can support with_fetch for limit inside CoalescePartitionsExec.
// Follow-up issue: https://github.com/apache/datafusion/issues/14446
global_state.satisfied = true;
return Ok((Transformed::no(pushdown_plan), global_state));
}

// Now the global state has the most recent information, we can remove
// the `LimitExec` plan. We will decide later if we should add it again
// or not.
Expand Down
50 changes: 40 additions & 10 deletions datafusion/physical-plan/src/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub struct CoalescePartitionsExec {
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
cache: PlanProperties,
/// Optional number of rows to fetch. Stops producing rows after this fetch
pub(crate) fetch: Option<usize>,
}

impl CoalescePartitionsExec {
Expand All @@ -53,6 +55,7 @@ impl CoalescePartitionsExec {
input,
metrics: ExecutionPlanMetricsSet::new(),
cache,
fetch: None,
}
}

Expand Down Expand Up @@ -83,9 +86,12 @@ impl DisplayAs for CoalescePartitionsExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "CoalescePartitionsExec")
}
DisplayFormatType::Default | DisplayFormatType::Verbose => match self.fetch {
Some(fetch) => {
write!(f, "CoalescePartitionsExec: fetch={fetch}")
}
None => write!(f, "CoalescePartitionsExec"),
},
}
}
}
Expand Down Expand Up @@ -116,9 +122,9 @@ impl ExecutionPlan for CoalescePartitionsExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(CoalescePartitionsExec::new(Arc::clone(
&children[0],
))))
let mut plan = CoalescePartitionsExec::new(Arc::clone(&children[0]));
plan.fetch = self.fetch;
Ok(Arc::new(plan))
}

fn execute(
Expand Down Expand Up @@ -164,7 +170,11 @@ impl ExecutionPlan for CoalescePartitionsExec {
}

let stream = builder.build();
Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)))
Ok(Box::pin(ObservedStream::new(
stream,
baseline_metrics,
self.fetch,
)))
}
}
}
Expand All @@ -174,7 +184,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
}

fn statistics(&self) -> Result<Statistics> {
self.input.statistics()
Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1)
}

fn supports_limit_pushdown(&self) -> bool {
Expand All @@ -197,8 +207,28 @@ impl ExecutionPlan for CoalescePartitionsExec {
return Ok(None);
}
// CoalescePartitionsExec always has a single child, so zero indexing is safe.
make_with_child(projection, projection.input().children()[0])
.map(|e| Some(Arc::new(CoalescePartitionsExec::new(e)) as _))
make_with_child(projection, projection.input().children()[0]).map(|e| {
if self.fetch.is_some() {
let mut plan = CoalescePartitionsExec::new(e);
plan.fetch = self.fetch;
Some(Arc::new(plan) as _)
} else {
Some(Arc::new(CoalescePartitionsExec::new(e)) as _)
}
})
}

fn fetch(&self) -> Option<usize> {
self.fetch
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
Some(Arc::new(CoalescePartitionsExec {
input: Arc::clone(&self.input),
fetch: limit,
metrics: self.metrics.clone(),
cache: self.cache.clone(),
}))
}
}

Expand Down
27 changes: 27 additions & 0 deletions datafusion/physical-plan/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,18 +444,44 @@ impl Stream for EmptyRecordBatchStream {
pub(crate) struct ObservedStream {
inner: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
fetch: Option<usize>,
produced: usize,
}

impl ObservedStream {
pub fn new(
inner: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
fetch: Option<usize>,
) -> Self {
Self {
inner,
baseline_metrics,
fetch,
produced: 0,
}
}

fn limit_reached(
&mut self,
poll: Poll<Option<Result<RecordBatch>>>,
) -> Poll<Option<Result<RecordBatch>>> {
let Some(fetch) = self.fetch else { return poll };

if self.produced >= fetch {
return Poll::Ready(None);
}

if let Poll::Ready(Some(Ok(batch))) = &poll {
if self.produced + batch.num_rows() > fetch {
let batch = batch.slice(0, fetch.saturating_sub(self.produced));
self.produced += batch.num_rows();
return Poll::Ready(Some(Ok(batch)));
};
self.produced += batch.num_rows()
}
poll
}
}

impl RecordBatchStream for ObservedStream {
Expand All @@ -472,6 +498,7 @@ impl Stream for ObservedStream {
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let poll = self.inner.poll_next_unpin(cx);
let poll = self.limit_reached(poll);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

let Some(fetch) = self.fetch else { return poll };

May be we can move the following to above, only call limit_reached when

let Some(fetch) = self.fetch {
    poll = self.limit_reached(poll);
}

self.baseline_metrics.record_poll(poll)
}
}
Expand Down
12 changes: 10 additions & 2 deletions datafusion/physical-plan/src/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,11 @@ impl ExecutionPlan for UnionExec {
if partition < input.output_partitioning().partition_count() {
let stream = input.execute(partition, context)?;
debug!("Found a Union partition to execute");
return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
return Ok(Box::pin(ObservedStream::new(
stream,
baseline_metrics,
None,
)));
} else {
partition -= input.output_partitioning().partition_count();
}
Expand Down Expand Up @@ -448,7 +452,11 @@ impl ExecutionPlan for InterleaveExec {
self.schema(),
input_stream_vec,
));
return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
return Ok(Box::pin(ObservedStream::new(
stream,
baseline_metrics,
None,
)));
}

warn!("Error in InterleaveExec: Partition {} not found", partition);
Expand Down
23 changes: 11 additions & 12 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5032,18 +5032,17 @@ logical_plan
03)----Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[min(aggregate_test_100.c1)]]
04)------TableScan: aggregate_test_100 projection=[c1, c3]
physical_plan
01)GlobalLimitExec: skip=0, fetch=5
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that SKIP is missing.

Copy link
Contributor

@zhuqi-lucas zhuqi-lucas Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the SKIP setting is not supported for all with_fetch operators, for example:

SortPreservingMergeExec also support fetch but not support setting skip, it's default to 0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We default to use skip=0 for the SortPreservingMergeExec:

 1 => match self.fetch {
                Some(fetch) => {
                    let stream = self.input.execute(0, context)?;
                    debug!("Done getting stream for SortPreservingMergeExec::execute with 1 input with {fetch}");
                    Ok(Box::pin(LimitStream::new(
                        stream,
                        0,
                        Some(fetch),
                        BaselineMetrics::new(&self.metrics, partition),
                    )))
                }
impl LimitStream {
    pub fn new(
        input: SendableRecordBatchStream,
        skip: usize,
        fetch: Option<usize>,
        baseline_metrics: BaselineMetrics,
    ) -> Self {
        let schema = input.schema();
        Self {
            skip,
            fetch: fetch.unwrap_or(usize::MAX),
            input: Some(input),
            schema,
            baseline_metrics,
        }
    }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as @zhuqi-lucas mentioned, only Limit operators support skip, and limit_pushdown is adding a Limit operator if skip exists, so this is only affecting plans without skip.

Here's the query result with skip:

query TT
EXPLAIN SELECT DISTINCT c3, min(c1) FROM aggregate_test_100 group by c3 limit 5 offset 3;
----
logical_plan
01)Limit: skip=3, fetch=5
02)--Aggregate: groupBy=[[aggregate_test_100.c3, min(aggregate_test_100.c1)]], aggr=[[]]
03)----Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[min(aggregate_test_100.c1)]]
04)------TableScan: aggregate_test_100 projection=[c1, c3]
physical_plan
01)GlobalLimitExec: skip=3, fetch=5
02)--CoalescePartitionsExec: fetch=8
03)----AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3, min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[8]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------RepartitionExec: partitioning=Hash([c3@0, min(aggregate_test_100.c1)@1], 4), input_partitions=4
06)----------AggregateExec: mode=Partial, gby=[c3@0 as c3, min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[8]
07)------------AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3], aggr=[min(aggregate_test_100.c1)]
08)--------------CoalesceBatchesExec: target_batch_size=8192
09)----------------RepartitionExec: partitioning=Hash([c3@0], 4), input_partitions=4
10)------------------AggregateExec: mode=Partial, gby=[c3@1 as c3], aggr=[min(aggregate_test_100.c1)]
11)--------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
12)----------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], has_header=true

02)--CoalescePartitionsExec
03)----AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3, min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------RepartitionExec: partitioning=Hash([c3@0, min(aggregate_test_100.c1)@1], 4), input_partitions=4
06)----------AggregateExec: mode=Partial, gby=[c3@0 as c3, min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
07)------------AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3], aggr=[min(aggregate_test_100.c1)]
08)--------------CoalesceBatchesExec: target_batch_size=8192
09)----------------RepartitionExec: partitioning=Hash([c3@0], 4), input_partitions=4
10)------------------AggregateExec: mode=Partial, gby=[c3@1 as c3], aggr=[min(aggregate_test_100.c1)]
11)--------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
12)----------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], has_header=true
01)CoalescePartitionsExec: fetch=5
02)--AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3, min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
03)----CoalesceBatchesExec: target_batch_size=8192
04)------RepartitionExec: partitioning=Hash([c3@0, min(aggregate_test_100.c1)@1], 4), input_partitions=4
05)--------AggregateExec: mode=Partial, gby=[c3@0 as c3, min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
06)----------AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3], aggr=[min(aggregate_test_100.c1)]
07)------------CoalesceBatchesExec: target_batch_size=8192
08)--------------RepartitionExec: partitioning=Hash([c3@0], 4), input_partitions=4
09)----------------AggregateExec: mode=Partial, gby=[c3@1 as c3], aggr=[min(aggregate_test_100.c1)]
10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
11)--------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], has_header=true


#
Expand Down
5 changes: 2 additions & 3 deletions datafusion/sqllogictest/test_files/limit.slt
Original file line number Diff line number Diff line change
Expand Up @@ -852,9 +852,8 @@ physical_plan
01)ProjectionExec: expr=[foo@0 as foo]
02)--SortExec: TopK(fetch=1000), expr=[part_key@1 ASC NULLS LAST], preserve_partitioning=[false]
03)----ProjectionExec: expr=[1 as foo, part_key@0 as part_key]
04)------GlobalLimitExec: skip=0, fetch=1
05)--------CoalescePartitionsExec
06)----------ParquetExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet:0..794]]}, projection=[part_key], limit=1
04)------CoalescePartitionsExec: fetch=1
05)--------ParquetExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet:0..794]]}, projection=[part_key], limit=1

query I
with selection as (
Expand Down
11 changes: 5 additions & 6 deletions datafusion/sqllogictest/test_files/repartition.slt
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,11 @@ logical_plan
02)--Filter: sink_table.c3 > Int16(0)
03)----TableScan: sink_table projection=[c1, c2, c3]
physical_plan
01)GlobalLimitExec: skip=0, fetch=5
02)--CoalescePartitionsExec
03)----CoalesceBatchesExec: target_batch_size=8192, fetch=5
04)------FilterExec: c3@2 > 0
05)--------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
06)----------StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true
01)CoalescePartitionsExec: fetch=5
02)--CoalesceBatchesExec: target_batch_size=8192, fetch=5
03)----FilterExec: c3@2 > 0
04)------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
05)--------StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true

# Start repratition on empty column test.
# See https://github.com/apache/datafusion/issues/12057
Expand Down
43 changes: 21 additions & 22 deletions datafusion/sqllogictest/test_files/union.slt
Original file line number Diff line number Diff line change
Expand Up @@ -510,28 +510,27 @@ logical_plan
19)------------Projection: Int64(1) AS c1
20)--------------EmptyRelation
physical_plan
01)GlobalLimitExec: skip=0, fetch=3
02)--CoalescePartitionsExec
03)----UnionExec
04)------ProjectionExec: expr=[count(*)@0 as cnt]
05)--------AggregateExec: mode=Final, gby=[], aggr=[count(*)]
06)----------CoalescePartitionsExec
07)------------AggregateExec: mode=Partial, gby=[], aggr=[count(*)]
08)--------------ProjectionExec: expr=[]
09)----------------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[]
10)------------------CoalesceBatchesExec: target_batch_size=2
11)--------------------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4
12)----------------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[]
13)------------------------CoalesceBatchesExec: target_batch_size=2
14)--------------------------FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434, projection=[c1@0]
15)----------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
16)------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c13], has_header=true
17)------ProjectionExec: expr=[1 as cnt]
18)--------PlaceholderRowExec
19)------ProjectionExec: expr=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as cnt]
20)--------BoundedWindowAggExec: wdw=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }], mode=[Sorted]
21)----------ProjectionExec: expr=[1 as c1]
22)------------PlaceholderRowExec
01)CoalescePartitionsExec: fetch=3
02)--UnionExec
03)----ProjectionExec: expr=[count(*)@0 as cnt]
04)------AggregateExec: mode=Final, gby=[], aggr=[count(*)]
05)--------CoalescePartitionsExec
06)----------AggregateExec: mode=Partial, gby=[], aggr=[count(*)]
07)------------ProjectionExec: expr=[]
08)--------------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[]
09)----------------CoalesceBatchesExec: target_batch_size=2
10)------------------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4
11)--------------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[]
12)----------------------CoalesceBatchesExec: target_batch_size=2
13)------------------------FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434, projection=[c1@0]
14)--------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
15)----------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c13], has_header=true
16)----ProjectionExec: expr=[1 as cnt]
17)------PlaceholderRowExec
18)----ProjectionExec: expr=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as cnt]
19)------BoundedWindowAggExec: wdw=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }], mode=[Sorted]
20)--------ProjectionExec: expr=[1 as c1]
21)----------PlaceholderRowExec


########
Expand Down
Loading