Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test-python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ jobs:
if: github.ref_name != 'main' && matrix.python-version != '3.13t'
env:
POLARS_TIMEOUT_MS: 60000
run: pytest -n auto -m "not release and not benchmark and not docs"
run: pytest -sv tests/unit/io/test_iceberg.py -m "" # -n auto -m "not release and not benchmark and not docs"

- name: Run tests with new streaming engine
if: github.ref_name != 'main' && matrix.python-version != '3.13t'
Expand Down
14 changes: 14 additions & 0 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1124,6 +1124,20 @@ impl LazyFrame {
}))
}

#[cfg(feature = "python")]
pub fn sink_iceberg(
self,
dataset: Arc<crate::dsl::python_dataset::PythonDatasetProvider>,
cloud_options: Option<polars_io::cloud::CloudOptions>,
mode: IcebergWriteMode,
) -> PolarsResult<Self> {
self.sink(SinkType::Dataset(DatasetSinkType {
dataset,
cloud_options,
mode,
}))
}

#[cfg(feature = "new_streaming")]
pub fn try_new_streaming_if_requested(
&mut self,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/dsl-schema.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
839943defff2e79e3ab3db5d0997f7ea73683f5b4bca8d9a7578cce6a6d99f0a
839943defff2e79e3ab3db5d0997f7ea73683f5b4bca8d9a7578cce6a6d99f0a
4 changes: 4 additions & 0 deletions crates/polars-plan/src/client/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ pub(super) fn assert_cloud_eligible(dsl: &DslPlan) -> PolarsResult<()> {
SinkType::Partition(_) => {
return ineligible_error("contains partition sink");
},
#[cfg(feature = "python")]
SinkType::Dataset(_) => {
return ineligible_error("contains dataset sink");
},
}
},
DslPlan::SinkMultiple { .. } => {
Expand Down
12 changes: 11 additions & 1 deletion crates/polars-plan/src/dsl/file_scan/python_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use polars_core::schema::SchemaRef;
use polars_utils::pl_str::PlSmallStr;
use polars_utils::python_function::PythonObject;

use crate::dsl::DslPlan;
use crate::dsl::{DslPlan, IcebergWriteMode};

/// This is for `polars-python` to inject so that the implementation can be done there:
/// * The impls for converting from Python objects are there.
Expand All @@ -23,6 +23,12 @@ pub struct PythonDatasetProviderVTable {
limit: Option<usize>,
projection: Option<&[PlSmallStr]>,
) -> PolarsResult<DslPlan>,

pub to_dataset_sink: fn(
dataset_object: &PythonObject,
input: DslPlan,
mode: IcebergWriteMode,
) -> PolarsResult<DslPlan>,
}

pub fn dataset_provider_vtable() -> Result<&'static PythonDatasetProviderVTable, &'static str> {
Expand Down Expand Up @@ -63,4 +69,8 @@ impl PythonDatasetProvider {
projection,
)
}

pub fn to_dataset_sink(&self, input: DslPlan, mode: IcebergWriteMode) -> PolarsResult<DslPlan> {
(dataset_provider_vtable().unwrap().to_dataset_sink)(&self.dataset_object, input, mode)
}
}
25 changes: 24 additions & 1 deletion crates/polars-plan/src/dsl/options/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use polars_utils::IdxSize;
use polars_utils::arena::Arena;
use polars_utils::pl_str::PlSmallStr;
use polars_utils::plpath::PlPath;
use strum_macros::IntoStaticStr;

use super::{ExprIR, FileType};
use crate::dsl::{AExpr, Expr, SpecialEq};
Expand Down Expand Up @@ -496,13 +497,35 @@ pub struct PartitionSinkTypeIR {
pub finish_callback: Option<SinkFinishCallback>,
}

#[cfg(feature = "python")]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Copy, Debug, IntoStaticStr)]
#[strum(serialize_all = "camelCase")]
pub enum IcebergWriteMode {
Overwrite,
Append,
}

#[cfg(feature = "python")]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
#[derive(Clone, Debug)]
pub struct DatasetSinkType {
pub dataset: Arc<crate::dsl::python_dataset::PythonDatasetProvider>,
pub cloud_options: Option<polars_io::cloud::CloudOptions>,
pub mode: IcebergWriteMode,
}

#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
#[derive(Clone, Debug)]
pub enum SinkType {
Memory,
File(FileSinkType),
Partition(PartitionSinkType),
#[cfg(feature = "python")]
Dataset(DatasetSinkType),
}

#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
Expand Down
24 changes: 24 additions & 0 deletions crates/polars-plan/src/plans/conversion/dataset/iceberg.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use std::sync::Arc;

use polars_core::error::PolarsResult;

use crate::dsl::{DslPlan, IcebergWriteMode, SinkType};

pub fn iceberg_dataset_to_dsl(
input: DslPlan,
dataset: &crate::dsl::python_dataset::PythonDatasetProvider,
cloud_options: Option<polars_io::cloud::CloudOptions>,
mode: IcebergWriteMode,
) -> PolarsResult<(Arc<DslPlan>, SinkType)> {
match dataset.to_dataset_sink(input, mode)? {
DslPlan::Sink { input, mut payload } => {
match &mut payload {
SinkType::Partition(f) => f.cloud_options = cloud_options,
_ => unreachable!(),
}

Ok((input, payload))
},
_ => unreachable!(),
}
}
1 change: 1 addition & 0 deletions crates/polars-plan/src/plans/conversion/dataset/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod iceberg;
14 changes: 14 additions & 0 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -866,9 +866,21 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult
}
},
DslPlan::Sink { input, payload } => {
#[cfg(feature = "python")]
let (input, payload) = match payload {
SinkType::Dataset(f) => dataset::iceberg::iceberg_dataset_to_dsl(
input.as_ref().clone(),
f.dataset.as_ref(),
f.cloud_options,
f.mode,
)?,
p => (input, p),
};

let input =
to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(sink)))?;
let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);

let payload = match payload {
SinkType::Memory => SinkTypeIR::Memory,
SinkType::File(f) => SinkTypeIR::File(f),
Expand Down Expand Up @@ -929,6 +941,8 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult
},
finish_callback: f.finish_callback,
}),
#[cfg(feature = "python")]
SinkType::Dataset(_) => unreachable!(),
};

let lp = IR::Sink { input, payload };
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-plan/src/plans/conversion/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
mod convert_utils;
#[cfg(feature = "python")]
mod dataset;
mod dsl_to_ir;
mod ir_to_dsl;
mod stack_opt;
Expand Down
29 changes: 28 additions & 1 deletion crates/polars-python/src/dataset/dataset_provider_funcs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Note: Currently only used for iceberg.
use std::sync::Arc;

use polars::prelude::{DslPlan, PlSmallStr, Schema, SchemaRef};
use polars::prelude::{DslPlan, IcebergWriteMode, LazyFrame, PlSmallStr, Schema, SchemaRef};
use polars_core::config;
use polars_error::PolarsResult;
use polars_utils::python_function::PythonObject;
Expand All @@ -12,6 +12,7 @@ use pyo3::types::{PyAnyMethods, PyDict, PyList, PyListMethods};
use pyo3::{PyResult, Python};

use crate::interop::arrow::to_rust::field_to_rust;
use crate::lazyframe::PyLazyFrame;
use crate::prelude::{Wrap, get_lf};

pub fn reader_name(dataset_object: &PythonObject) -> PlSmallStr {
Expand Down Expand Up @@ -110,3 +111,29 @@ pub fn to_dataset_scan(
Ok(lf.logical_plan)
})
}

pub fn to_dataset_sink(
dataset_object: &PythonObject,
lf: DslPlan,
mode: IcebergWriteMode,
) -> PolarsResult<DslPlan> {
Python::with_gil(|py| {
let kwargs = PyDict::new(py);

let lf = LazyFrame::from(lf);
kwargs.set_item("lf", PyLazyFrame::from(lf))?;
kwargs.set_item("mode", <&'static str>::from(mode))?;

let sink = dataset_object
.getattr(py, "_to_dataset_sink")?
.call(py, (), Some(&kwargs))?;

let Ok(lf) = get_lf(sink.bind(py)) else {
return Err(
PyValueError::new_err(format!("cannot extract LazyFrame from {}", &sink)).into(),
);
};

Ok(lf.logical_plan)
})
}
27 changes: 27 additions & 0 deletions crates/polars-python/src/lazyframe/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::num::NonZeroUsize;

use either::Either;
use polars::io::{HiveOptions, RowIndex};
use polars::prelude::python_dataset::PythonDatasetProvider;
use polars::time::*;
use polars_core::prelude::*;
#[cfg(feature = "parquet")]
Expand Down Expand Up @@ -423,6 +424,7 @@ impl PyLazyFrame {
reader_name: dataset_provider_funcs::reader_name,
schema: dataset_provider_funcs::schema,
to_dataset_scan: dataset_provider_funcs::to_dataset_scan,
to_dataset_sink: dataset_provider_funcs::to_dataset_sink,
});

let lf =
Expand Down Expand Up @@ -979,6 +981,31 @@ impl PyLazyFrame {
.map_err(Into::into)
}

#[pyo3(signature = (dataset_object, mode))]
fn sink_iceberg(
&self,
py: Python<'_>,
dataset_object: PyObject,
mode: Wrap<IcebergWriteMode>,
) -> PyResult<Self> {
use crate::dataset::dataset_provider_funcs;

polars_plan::dsl::DATASET_PROVIDER_VTABLE.get_or_init(|| PythonDatasetProviderVTable {
reader_name: dataset_provider_funcs::reader_name,
schema: dataset_provider_funcs::schema,
to_dataset_scan: dataset_provider_funcs::to_dataset_scan,
to_dataset_sink: dataset_provider_funcs::to_dataset_sink,
});

let dataset = Arc::new(PythonDatasetProvider::new(PythonObject(dataset_object)));
py.enter_polars(|| {
let ldf = self.ldf.clone();
ldf.sink_iceberg(dataset, None, mode.0)
})
.map(Into::into)
.map_err(Into::into)
}

fn fetch(&self, py: Python<'_>, n_rows: usize) -> PyResult<PyDataFrame> {
let ldf = self.ldf.clone();
py.enter_polars_df(|| ldf.fetch(n_rows))
Expand Down
19 changes: 17 additions & 2 deletions crates/polars-python/src/lazyframe/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::sync::{Arc, Mutex};

use polars::prelude::sync_on_close::SyncOnCloseType;
use polars::prelude::{
PartitionTargetCallbackResult, PartitionVariant, PlPath, SinkFinishCallback, SinkOptions,
SortColumn, SpecialEq,
IcebergWriteMode, PartitionTargetCallbackResult, PartitionVariant, PlPath, SinkFinishCallback,
SinkOptions, SortColumn, SpecialEq,
};
use polars_utils::IdxSize;
use polars_utils::plpath::PlPathRef;
Expand Down Expand Up @@ -247,3 +247,18 @@ impl<'py> FromPyObject<'py> for Wrap<SinkOptions> {
}))
}
}

impl<'py> FromPyObject<'py> for Wrap<IcebergWriteMode> {
fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
let parsed = match &*ob.extract::<PyBackedStr>()? {
"overwrite" => IcebergWriteMode::Overwrite,
"append" => IcebergWriteMode::Append,
v => {
return Err(PyValueError::new_err(format!(
"`mode` must be one of {{'overwrite', 'append'}}, got {v}",
)));
},
};
Ok(Wrap(parsed))
}
}
1 change: 1 addition & 0 deletions py-polars/docs/source/reference/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ Iceberg

scan_iceberg
DataFrame.write_iceberg
LazyFrame.sink_iceberg

JSON
~~~~
Expand Down
2 changes: 2 additions & 0 deletions py-polars/polars/_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,3 +466,5 @@ def __init__(self, *, arrow_schema: str) -> None:

ParquetMetadataFn: TypeAlias = Callable[[ParquetMetadataContext], dict[str, str]]
ParquetMetadata: TypeAlias = Union[dict[str, str], ParquetMetadataFn]

IcebergWriteMode: TypeAlias = Literal["overwrite", "append"]
Loading
Loading