Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl<R: MmapBytesReader> ParquetReader<R> {
e.wrap_msg(|s| {
format!(
"error with column selection, \
consider enabling `allow_missing_columns`: {s}"
consider passing `missing_columns='insert'`: {s}"
)
})
} else {
Expand Down
20 changes: 20 additions & 0 deletions crates/polars-python/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,26 @@ impl PyScanOptions<'_> {
)
}

pub fn missing_columns_policy(&self) -> PyResult<MissingColumnsPolicy> {
let py = self.0.py();

Ok(
match &*self
.0
.getattr(intern!(py, "missing_columns"))?
.extract::<PyBackedStr>()?
{
"insert" => MissingColumnsPolicy::Insert,
"raise" => MissingColumnsPolicy::Raise,
v => {
return Err(PyValueError::new_err(format!(
"unknown option for missing_columns: {v}"
)));
},
},
)
}

pub fn extract_cast_options(&self) -> PyResult<CastColumnsPolicy> {
let py = self.0.py();
let ob = self.0.getattr(intern!(py, "cast_options"))?;
Expand Down
10 changes: 2 additions & 8 deletions crates/polars-python/src/lazyframe/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,7 @@ impl PyLazyFrame {
#[pyo3(signature = (
source, sources, n_rows, cache, parallel, rechunk, row_index, low_memory, cloud_options,
credential_provider, use_statistics, hive_partitioning, schema, hive_schema,
try_parse_hive_dates, retries, glob, include_file_paths, allow_missing_columns,
scan_options,
try_parse_hive_dates, retries, glob, include_file_paths, scan_options,
))]
fn new_from_parquet(
source: Option<PyObject>,
Expand All @@ -326,7 +325,6 @@ impl PyLazyFrame {
retries: usize,
glob: bool,
include_file_paths: Option<String>,
allow_missing_columns: bool,
scan_options: PyScanOptions,
) -> PyResult<Self> {
use cloud::credential_provider::PlCredentialProvider;
Expand Down Expand Up @@ -398,11 +396,7 @@ impl PyLazyFrame {
row_index,
pre_slice: n_rows.map(|len| Slice::Positive { offset: 0, len }),
cast_columns_policy: scan_options.extract_cast_options()?,
missing_columns_policy: if allow_missing_columns {
MissingColumnsPolicy::Insert
} else {
MissingColumnsPolicy::Raise
},
missing_columns_policy: scan_options.missing_columns_policy()?,
extra_columns_policy: scan_options.extra_columns_policy()?,
include_file_paths: include_file_paths.map(|x| x.into()),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub fn initialize_missing_columns_policy(
{
polars_bail!(
ColumnNotFound:
"did not find column {}, consider enabling `allow_missing_columns`",
"did not find column {}, consider passing `missing_columns='insert'`",
col,
)
}
Expand Down
2 changes: 1 addition & 1 deletion py-polars/polars/io/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ def _split_schema(
file_uris,
schema=main_schema,
hive_schema=hive_schema if len(partition_columns) > 0 else None,
allow_missing_columns=True,
missing_columns="insert",
hive_partitioning=len(partition_columns) > 0,
storage_options=storage_options,
credential_provider=credential_provider_builder, # type: ignore[arg-type]
Expand Down
2 changes: 1 addition & 1 deletion py-polars/polars/io/iceberg/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def to_dataset_scan(

return scan_parquet(
sources,
allow_missing_columns=True,
missing_columns="insert",
)

elif reader_override == "native":
Expand Down
66 changes: 57 additions & 9 deletions py-polars/polars/io/parquet/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@

import polars.functions as F
from polars import concat as plconcat
from polars._utils.deprecation import deprecate_renamed_parameter
from polars._utils.deprecation import (
deprecate_renamed_parameter,
issue_deprecation_warning,
)
from polars._utils.unstable import issue_unstable_warning
from polars._utils.various import (
is_int_sequence,
Expand Down Expand Up @@ -66,7 +69,8 @@ def read_parquet(
pyarrow_options: dict[str, Any] | None = None,
memory_map: bool = True,
include_file_paths: str | None = None,
allow_missing_columns: bool = False,
missing_columns: Literal["insert", "raise"] = "raise",
allow_missing_columns: bool | None = None,
) -> DataFrame:
"""
Read into a DataFrame from a parquet file.
Expand Down Expand Up @@ -114,7 +118,7 @@ def read_parquet(
schema
Specify the datatypes of the columns. The datatypes must match the
datatypes in the file(s). If there are extra columns that are not in the
file(s), consider also enabling `allow_missing_columns`.
file(s), consider also passing `missing_columns='insert'`.

.. warning::
This functionality is considered **unstable**. It may be changed
Expand Down Expand Up @@ -169,13 +173,24 @@ def read_parquet(
include_file_paths
Include the path of the source file(s) as a column with this name.
Only valid when `use_pyarrow=False`.
missing_columns
Configuration for behavior when columns defined in the schema
are missing from the data:

* `insert`: Inserts the missing columns using NULLs as the row values.
* `raise`: Raises an error.

allow_missing_columns
When reading a list of parquet files, if a column existing in the first
file cannot be found in subsequent files, the default behavior is to
raise an error. However, if `allow_missing_columns` is set to
`True`, a full-NULL column is returned instead of erroring for the files
that do not contain the column.

.. deprecated:: 1.30.0
Use the parameter `missing_columns` instead and pass one of
`('insert', 'raise')`.

Returns
-------
DataFrame
Expand Down Expand Up @@ -227,6 +242,16 @@ def read_parquet(
rechunk=rechunk,
)

if allow_missing_columns is not None:
issue_deprecation_warning(
"the parameter `allow_missing_columns` for `read_parquet` is deprecated. "
"Use the parameter `missing_columns` instead and pass one of "
"`('insert', 'raise')`.",
version="1.30.0",
)

missing_columns = "insert" if allow_missing_columns else "raise"

# For other inputs, defer to `scan_parquet`
lf = scan_parquet(
source,
Expand All @@ -247,7 +272,7 @@ def read_parquet(
retries=retries,
glob=glob,
include_file_paths=include_file_paths,
allow_missing_columns=allow_missing_columns,
missing_columns=missing_columns,
)

if columns is not None:
Expand Down Expand Up @@ -385,7 +410,8 @@ def scan_parquet(
credential_provider: CredentialProviderFunction | Literal["auto"] | None = "auto",
retries: int = 2,
include_file_paths: str | None = None,
allow_missing_columns: bool = False,
missing_columns: Literal["insert", "raise"] = "raise",
allow_missing_columns: bool | None = None,
extra_columns: Literal["ignore", "raise"] = "raise",
cast_options: ScanCastOptions | None = None,
) -> LazyFrame:
Expand All @@ -399,6 +425,9 @@ def scan_parquet(
* The `row_count_name` parameter was renamed `row_index_name`.
* The `row_count_offset` parameter was renamed `row_index_offset`.

.. versionchanged:: 1.30.0
* The `allow_missing_columns` is deprecated in favor of `missing_columns`.

Parameters
----------
source
Expand Down Expand Up @@ -442,7 +471,7 @@ def scan_parquet(
schema
Specify the datatypes of the columns. The datatypes must match the
datatypes in the file(s). If there are extra columns that are not in the
file(s), consider also enabling `allow_missing_columns`.
file(s), consider also passing `missing_columns='insert'`.

.. warning::
This functionality is considered **unstable**. It may be changed
Expand Down Expand Up @@ -489,12 +518,23 @@ def scan_parquet(
Number of retries if accessing a cloud instance fails.
include_file_paths
Include the path of the source file(s) as a column with this name.
missing_columns
Configuration for behavior when columns defined in the schema
are missing from the data:

* `insert`: Inserts the missing columns using NULLs as the row values.
* `raise`: Raises an error.

allow_missing_columns
When reading a list of parquet files, if a column existing in the first
file cannot be found in subsequent files, the default behavior is to
raise an error. However, if `allow_missing_columns` is set to
`True`, a full-NULL column is returned instead of erroring for the files
that do not contain the column.

.. deprecated:: 1.30.0
Use the parameter `missing_columns` instead and pass one of
`('insert', 'raise')`.
extra_columns
Configuration for behavior when extra columns outside of the
defined schema are encountered in the data:
Expand Down Expand Up @@ -555,6 +595,16 @@ def scan_parquet(
credential_provider, source, storage_options, "scan_parquet"
)

if allow_missing_columns is not None:
issue_deprecation_warning(
"the parameter `allow_missing_columns` for `scan_parquet` is deprecated. "
"Use the parameter `missing_columns` instead and pass one of "
"`('insert', 'raise')`.",
version="1.30.0",
)

missing_columns = "insert" if allow_missing_columns else "raise"

return _scan_parquet_impl(
source, # type: ignore[arg-type]
n_rows=n_rows,
Expand All @@ -574,10 +624,10 @@ def scan_parquet(
retries=retries,
glob=glob,
include_file_paths=include_file_paths,
allow_missing_columns=allow_missing_columns,
scan_options=ScanOptions(
cast_options=cast_options,
extra_columns=extra_columns,
missing_columns=missing_columns,
),
)

Expand All @@ -602,7 +652,6 @@ def _scan_parquet_impl(
try_parse_hive_dates: bool = True,
retries: int = 2,
include_file_paths: str | None = None,
allow_missing_columns: bool = False,
scan_options: ScanOptions,
) -> LazyFrame:
if isinstance(source, list):
Expand Down Expand Up @@ -636,7 +685,6 @@ def _scan_parquet_impl(
retries=retries,
glob=glob,
include_file_paths=include_file_paths,
allow_missing_columns=allow_missing_columns,
scan_options=scan_options,
)

Expand Down
4 changes: 4 additions & 0 deletions py-polars/polars/io/scan_options/_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@


class ScanOptions:
"""Holds options for UnifiedScanArgs."""

def __init__(
self,
*,
cast_options: ScanCastOptions | None,
extra_columns: Literal["ignore", "raise"],
missing_columns: Literal["insert", "raise"] = "raise",
) -> None:
self.cast_options = cast_options
self.extra_columns = extra_columns
self.missing_columns = missing_columns
18 changes: 9 additions & 9 deletions py-polars/tests/unit/io/test_lazy_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ def test_parquet_schema_arg(
lf.collect(engine="streaming" if streaming else "in-memory")

lf = pl.scan_parquet(
paths, parallel=parallel, schema=schema, allow_missing_columns=True
paths, parallel=parallel, schema=schema, missing_columns="insert"
)

assert_frame_equal(
Expand All @@ -764,17 +764,17 @@ def test_parquet_schema_arg(
# Just one test that `read_parquet` is propagating this argument.
assert_frame_equal(
pl.read_parquet(
paths, parallel=parallel, schema=schema, allow_missing_columns=True
paths, parallel=parallel, schema=schema, missing_columns="insert"
),
pl.DataFrame({"1": None, "a": [1, 2], "b": [1, 2]}, schema=schema),
)

# Issue #19081: If a schema arg is passed, ensure its fields are propagated
# to the IR, otherwise even if `allow_missing_columns=True`, downstream
# to the IR, otherwise even if `missing_columns='insert'`, downstream
# `select()`s etc. will fail with ColumnNotFound if the column is not in
# the first file.
lf = pl.scan_parquet(
paths, parallel=parallel, schema=schema, allow_missing_columns=True
paths, parallel=parallel, schema=schema, missing_columns="insert"
).select("1")

s = lf.collect(engine="streaming" if streaming else "in-memory").to_series()
Expand All @@ -785,12 +785,12 @@ def test_parquet_schema_arg(

schema: dict[str, type[pl.DataType]] = {"a": pl.Int64} # type: ignore[no-redef]

for allow_missing_columns in [True, False]:
for missing_columns in ["insert", "raise"]:
lf = pl.scan_parquet(
paths,
parallel=parallel,
schema=schema,
allow_missing_columns=allow_missing_columns,
missing_columns=missing_columns, # type: ignore[arg-type]
)

with pytest.raises(pl.exceptions.SchemaError):
Expand Down Expand Up @@ -851,11 +851,11 @@ def test_scan_parquet_schema_specified_with_empty_files_list(tmp_path: Path) ->
)


@pytest.mark.parametrize("allow_missing_columns", [True, False])
@pytest.mark.parametrize("missing_columns", ["insert", "raise"])
@pytest.mark.write_disk
def test_scan_parquet_ignores_dtype_mismatch_for_non_projected_columns_19249(
tmp_path: Path,
allow_missing_columns: bool,
missing_columns: str,
) -> None:
tmp_path.mkdir(exist_ok=True)
paths = [tmp_path / "1", tmp_path / "2"]
Expand All @@ -868,7 +868,7 @@ def test_scan_parquet_ignores_dtype_mismatch_for_non_projected_columns_19249(
).write_parquet(paths[1])

assert_frame_equal(
pl.scan_parquet(paths, allow_missing_columns=allow_missing_columns)
pl.scan_parquet(paths, missing_columns=missing_columns) # type: ignore[arg-type]
.select("a")
.collect(engine="in-memory"),
pl.DataFrame({"a": [1, 1]}, schema={"a": pl.Int32}),
Expand Down
8 changes: 4 additions & 4 deletions py-polars/tests/unit/io/test_multiscan.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,14 @@ def test_multiscan_projection(
ri = "row_index" if row_index else None

args = {
"allow_missing_columns": missing_column,
"missing_columns": "insert" if missing_column else "raise",
"include_file_paths": ifp,
"row_index_name": ri,
"hive_partitioning": hive,
}

if not supports_missing_columns:
del args["allow_missing_columns"]
del args["missing_columns"]
if not supports_hive_partitioning:
del args["hive_partitioning"]

Expand Down Expand Up @@ -685,12 +685,12 @@ def test_extra_columns_not_ignored_22218() -> None:
pl.exceptions.SchemaError,
match="extra column in file outside of expected schema: c, hint: specify .*or pass",
):
(pl.scan_parquet(files, allow_missing_columns=True).select(pl.all()).collect())
(pl.scan_parquet(files, missing_columns="insert").select(pl.all()).collect())

assert_frame_equal(
pl.scan_parquet(
files,
allow_missing_columns=True,
missing_columns="insert",
extra_columns="ignore",
)
.select(pl.all())
Expand Down
Loading
Loading