Skip to content

Commit b6d08f7

Browse files
Merge branch 'main' into arrow-pushdown-pyo
2 parents 145565d + 21f150f commit b6d08f7

82 files changed

Lines changed: 2304 additions & 406 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/lint-rust.yml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,3 +103,15 @@ jobs:
103103
--features object
104104
-p polars-core
105105
# -p polars-arrow
106+
107+
deny:
108+
if: github.ref_name != 'main'
109+
runs-on: ubuntu-latest
110+
steps:
111+
- uses: actions/checkout@v6
112+
113+
- name: Install cargo-deny
114+
uses: taiki-e/install-action@cargo-deny
115+
116+
- name: Run cargo deny
117+
run: cargo deny check

Makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,10 @@ fmt: update-cargo-env ## Run autoformatting and linting
160160
dprint fmt
161161
$(VENV_BIN)/typos
162162

163+
.PHONY: deny
164+
deny:
165+
cargo deny check
166+
163167
.PHONY: fix
164168
fix: update-cargo-env
165169
cargo clippy --workspace --all-targets --all-features --fix

crates/polars-expr/src/expressions/aggregation.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -425,11 +425,23 @@ impl PhysicalExpr for AggregationExpr {
425425
AggregatedScalar(agg_s.with_name(keep_name))
426426
},
427427
GroupByMethod::Implode { maintain_order: _ } => {
428-
AggregatedScalar(match ac.agg_state() {
428+
let col = match ac.agg_state() {
429429
AggState::LiteralScalar(_) => unreachable!(), // handled above
430430
AggState::AggregatedScalar(c) => c.as_list().into_column(),
431-
AggState::NotAggregated(_) | AggState::AggregatedList(_) => ac.aggregated(),
432-
})
431+
AggState::AggregatedList(c) => c.clone(),
432+
AggState::NotAggregated(_) => ac.aggregated(),
433+
};
434+
// TODO: Introduce `UpdateGroups::WithUnitLen` as a new lazy `groups()` method
435+
// and move the groups constructor there. Then, set `UpdateGroups::WithUnitLen` to
436+
// all AggregationExprs.
437+
let groups = Cow::Owned({
438+
let groups = (0..col.len() as IdxSize).map(|i| [i, 1]).collect();
439+
GroupsType::new_slice(groups, false, true).into_sliceable()
440+
});
441+
return Ok(AggregationContext::from_agg_state(
442+
AggregatedScalar(col),
443+
groups,
444+
));
433445
},
434446
GroupByMethod::Groups => {
435447
let mut column: ListChunked = ac.groups().as_list_chunked();

crates/polars-lazy/src/frame/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1942,7 +1942,12 @@ impl LazyFrame {
19421942
}
19431943

19441944
#[cfg(feature = "merge_sorted")]
1945-
pub fn merge_sorted<S>(self, other: LazyFrame, key: S) -> PolarsResult<LazyFrame>
1945+
pub fn merge_sorted<S>(
1946+
self,
1947+
other: LazyFrame,
1948+
key: S,
1949+
maintain_order: bool,
1950+
) -> PolarsResult<LazyFrame>
19461951
where
19471952
S: Into<PlSmallStr>,
19481953
{
@@ -1952,6 +1957,7 @@ impl LazyFrame {
19521957
input_left: Arc::new(self.logical_plan),
19531958
input_right: Arc::new(other.logical_plan),
19541959
key,
1960+
maintain_order,
19551961
};
19561962
Ok(LazyFrame::from_logical_plan(lp, self.opt_state))
19571963
}

crates/polars-mem-engine/src/executors/sort.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,13 @@ pub(crate) struct SortExec {
1010
}
1111

1212
impl SortExec {
13-
fn execute_impl(&mut self, state: &ExecutionState, df: DataFrame) -> PolarsResult<DataFrame> {
13+
fn execute_impl(
14+
&mut self,
15+
state: &ExecutionState,
16+
mut df: DataFrame,
17+
) -> PolarsResult<DataFrame> {
1418
state.should_stop()?;
19+
df.rechunk_mut_par();
1520

1621
let height = df.height();
1722

crates/polars-mem-engine/src/planner/lp.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -809,6 +809,8 @@ fn create_physical_plan_impl(
809809
input_left,
810810
input_right,
811811
key,
812+
// In the in-memory engine, merge_sorted is always order-maintaining.
813+
maintain_order: _,
812814
} => {
813815
let (input_left, input_right) = state.with_new_branch(|new_state| {
814816
(

crates/polars-ops/src/frame/join/merge_sorted.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
use arrow::legacy::utils::CustomIterTools;
2+
#[cfg(feature = "dtype-categorical")]
3+
use polars_core::datatypes::CategoricalPhysical;
24
use polars_core::prelude::*;
3-
use polars_core::{with_match_categorical_physical_type, with_match_physical_numeric_polars_type};
5+
#[cfg(feature = "dtype-categorical")]
6+
use polars_core::with_match_categorical_physical_type;
7+
use polars_core::with_match_physical_numeric_polars_type;
48

59
pub fn _merge_sorted_dfs(
610
left: &DataFrame,
@@ -185,6 +189,7 @@ where
185189
}
186190

187191
fn series_to_merge_indicator(lhs: &Series, rhs: &Series) -> PolarsResult<Vec<bool>> {
192+
#[cfg(feature = "dtype-categorical")]
188193
if lhs.dtype().is_categorical() || lhs.dtype().is_enum() {
189194
let cat_phys = lhs.dtype().cat_physical().unwrap();
190195
with_match_categorical_physical_type!(cat_phys, |$C| {

crates/polars-plan/dsl-schema-hashes.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
"Dimension": "68880cdb10230df6c8c1632b073c80bd8ceb5c56a368c0cb438431ca9f3d3b31",
4545
"DistinctOptionsDSL": "41be5ec69ef9a614f2b36ac5deadfecdea5cca847ae1ada9d4bc626ff52a5b38",
4646
"DslFunction": "221f1a46a043c8ed54f57be981bf24509f04f5f91f0f08e0acc180d96f842ebf",
47-
"DslPlan": "14caf5b73e69c4975ff3a57331891521ff5b78c96bbaf8d6cc9be57c82f3ea98",
47+
"DslPlan": "037aeb1be892efd716c6934961e6df74dcd38815064b6d7efa72efe41e6e913d",
4848
"Duration": "44999d59023085cbb592ce94b30d34f9b983081fc72bd6435a49bdf0869c0074",
4949
"Duration2": "f251cb1bee2955a17c6defe1573bce21ddbe6cdf6eb9324a19cd37932ab29347",
5050
"DynListLiteralValue": "2266a553cb4a943f7097f24539eaa802453cf8742675996215235bd682dec0e8",

crates/polars-plan/src/dsl/plan.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ pub enum DslPlan {
169169
input_left: Arc<DslPlan>,
170170
input_right: Arc<DslPlan>,
171171
key: PlSmallStr,
172+
maintain_order: bool,
172173
},
173174
IR {
174175
// Keep the original Dsl around as we need that for serialization.
@@ -211,7 +212,7 @@ impl Clone for DslPlan {
211212
#[cfg(feature = "pivot")]
212213
Self::Pivot { input, on, on_columns, index, values, agg, separator, maintain_order, column_naming } => Self::Pivot { input: input.clone(), on: on.clone(), on_columns: on_columns.clone(), index: index.clone(), values: values.clone(), agg: agg.clone(), separator: separator.clone(), maintain_order: *maintain_order, column_naming: *column_naming },
213214
#[cfg(feature = "merge_sorted")]
214-
Self::MergeSorted { input_left, input_right, key } => Self::MergeSorted { input_left: input_left.clone(), input_right: input_right.clone(), key: key.clone() },
215+
Self::MergeSorted { input_left, input_right, key, maintain_order } => Self::MergeSorted { input_left: input_left.clone(), input_right: input_right.clone(), key: key.clone(), maintain_order: *maintain_order },
215216
Self::IR {node, dsl, version} => Self::IR {node: *node, dsl: dsl.clone(), version: *version},
216217
}
217218
}

crates/polars-plan/src/dsl/serializable_plan.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ pub(crate) enum SerializableDslPlanNode {
146146
input_left: DslPlanKey,
147147
input_right: DslPlanKey,
148148
key: PlSmallStr,
149+
maintain_order: bool,
149150
},
150151
IR {
151152
dsl: DslPlanKey,
@@ -360,10 +361,12 @@ fn convert_dsl_plan_to_serializable_plan(
360361
input_left,
361362
input_right,
362363
key,
364+
maintain_order,
363365
} => SP::MergeSorted {
364366
input_left: dsl_plan_key(input_left, arenas),
365367
input_right: dsl_plan_key(input_right, arenas),
366368
key: key.clone(),
369+
maintain_order: *maintain_order,
367370
},
368371
DP::IR {
369372
dsl,
@@ -608,10 +611,12 @@ fn try_convert_serializable_plan_to_dsl_plan(
608611
input_left,
609612
input_right,
610613
key,
614+
maintain_order,
611615
} => Ok(DP::MergeSorted {
612616
input_left: get_dsl_plan(*input_left, ser_dsl_plan, arenas)?,
613617
input_right: get_dsl_plan(*input_right, ser_dsl_plan, arenas)?,
614618
key: key.clone(),
619+
maintain_order: *maintain_order,
615620
}),
616621
SP::IR {
617622
dsl: dsl_key,

0 commit comments

Comments
 (0)