Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions crates/polars-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,7 @@ pub fn get_engine_affinity() -> String {
/// Prints a log message if sensitive verbose logging has been enabled.
pub fn verbose_print_sensitive<F: Fn() -> String>(create_log_message: F) {
fn do_log(create_log_message: &dyn Fn() -> String) {
if std::env::var("POLARS_VERBOSE_SENSITIVE")
.as_deref()
.unwrap_or("")
== "1"
{
if std::env::var("POLARS_VERBOSE_SENSITIVE").as_deref() == Ok("1") {
// Force the message to be a single line.
let msg = create_log_message().replace('\n', "");
eprintln!("[SENSITIVE]: {}", msg)
Expand Down
50 changes: 50 additions & 0 deletions crates/polars-io/src/cloud/credential_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub use object_store::azure::AzureCredential;
pub use object_store::gcp::GcpCredential;
use polars_core::config;
use polars_error::{PolarsResult, polars_bail};
use polars_utils::pl_str::PlSmallStr;
#[cfg(feature = "python")]
use polars_utils::python_function::PythonObject;
#[cfg(feature = "python")]
Expand Down Expand Up @@ -148,6 +149,10 @@ pub trait IntoCredentialProvider: Sized {
fn into_gcp_provider(self) -> object_store::gcp::GcpCredentialProvider {
unimplemented!()
}

/// Note, technically shouldn't be under the `IntoCredentialProvider` trait, but it's here
/// for convenience.
fn storage_update_options(&self) -> PolarsResult<Vec<(PlSmallStr, PlSmallStr)>>;
}

impl IntoCredentialProvider for PlCredentialProvider {
Expand Down Expand Up @@ -177,6 +182,14 @@ impl IntoCredentialProvider for PlCredentialProvider {
Self::Python(v) => v.into_gcp_provider(),
}
}

fn storage_update_options(&self) -> PolarsResult<Vec<(PlSmallStr, PlSmallStr)>> {
match self {
Self::Function(v) => v.storage_update_options(),
#[cfg(feature = "python")]
Self::Python(v) => v.storage_update_options(),
}
}
}

type CredentialProviderFunctionImpl = Arc<
Expand Down Expand Up @@ -298,6 +311,10 @@ impl IntoCredentialProvider for CredentialProviderFunction {
})),
))
}

fn storage_update_options(&self) -> PolarsResult<Vec<(PlSmallStr, PlSmallStr)>> {
Ok(vec![])
}
}

impl Debug for CredentialProviderFunction {
Expand Down Expand Up @@ -462,6 +479,7 @@ mod python_impl {
use std::sync::Arc;

use polars_error::{PolarsError, PolarsResult};
use polars_utils::pl_str::PlSmallStr;
use polars_utils::python_function::PythonObject;
use pyo3::Python;
use pyo3::exceptions::PyValueError;
Expand Down Expand Up @@ -530,6 +548,13 @@ mod python_impl {
}
}

pub(crate) fn unwrap_as_provider_ref(&self) -> &Arc<PythonObject> {
match self {
Self::Builder(_) => panic!(),
Self::Provider(v) => v,
}
}

pub(super) fn func_addr(&self) -> usize {
(match self {
Self::Builder(v) => Arc::as_ptr(v),
Expand Down Expand Up @@ -735,6 +760,31 @@ mod python_impl {
}))
.into_gcp_provider()
}

/// # Panics
/// Panics if `self` is not an initialized provider.
fn storage_update_options(&self) -> PolarsResult<Vec<(PlSmallStr, PlSmallStr)>> {
let py_object = self.unwrap_as_provider_ref();

Python::with_gil(|py| {
py_object
.getattr(py, "_storage_update_options")
.map_or(Ok(vec![]), |f| {
let v = f.call0(py)?.extract::<pyo3::Bound<'_, PyDict>>(py)?;

let mut out = Vec::with_capacity(v.len());

for dict_item in v.call_method0("items")?.try_iter()? {
let (key, value) =
dict_item?.extract::<(PyBackedStr, PyBackedStr)>()?;

out.push(((&*key).into(), (&*value).into()))
}

Ok(out)
})
})
}
}

// Note: We don't consider `is_builder` for hash/eq - we don't expect the same Arc<PythonObject>
Expand Down
57 changes: 47 additions & 10 deletions crates/polars-io/src/cloud/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ pub(crate) fn try_build_http_header_map_from_items_slice<S: AsRef<str>>(

#[allow(dead_code)]
/// Parse an untype configuration hashmap to a typed configuration for the given configuration key type.
fn parsed_untyped_config<T, I: IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>>(
fn parse_untyped_config<T, I: IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>>(
config: I,
) -> PolarsResult<Configs<T>>
where
Expand Down Expand Up @@ -285,10 +285,25 @@ impl CloudOptions {
pub async fn build_aws(&self, url: &str) -> PolarsResult<impl object_store::ObjectStore> {
use super::credential_provider::IntoCredentialProvider;

let opt_credential_provider = self.initialized_credential_provider()?;

let mut builder = AmazonS3Builder::from_env()
.with_client_options(get_client_options())
.with_url(url);

if let Some(credential_provider) = &opt_credential_provider {
let storage_update_options = parse_untyped_config::<AmazonS3ConfigKey, _>(
credential_provider
.storage_update_options()?
.into_iter()
.map(|(k, v)| (k, v.to_string())),
)?;

for (key, value) in storage_update_options {
builder = builder.with_config(key, value);
}
}

read_config(
&mut builder,
&[(
Expand Down Expand Up @@ -322,7 +337,7 @@ impl CloudOptions {
let CloudConfig::Aws(options) = options else {
panic!("impl error: cloud type mismatch")
};
for (key, value) in options.iter() {
for (key, value) in options {
builder = builder.with_config(*key, value);
}
}
Expand Down Expand Up @@ -380,8 +395,30 @@ impl CloudOptions {

let builder = builder.with_retry(get_retry_config(self.max_retries));

let builder = if let Some(v) = self.initialized_credential_provider()? {
builder.with_credentials(v.into_aws_provider())
let opt_credential_provider = match opt_credential_provider {
#[cfg(feature = "python")]
Some(PlCredentialProvider::Python(object)) => {
if pyo3::Python::with_gil(|py| {
let Ok(func_object) = object
.unwrap_as_provider_ref()
.getattr(py, "_can_use_as_provider")
else {
return PolarsResult::Ok(true);
};

Ok(func_object.call0(py)?.extract::<bool>(py).unwrap())
})? {
Some(PlCredentialProvider::Python(object))
} else {
None
}
},

v => v,
};

let builder = if let Some(credential_provider) = opt_credential_provider {
builder.with_credentials(credential_provider.into_aws_provider())
} else {
builder
};
Expand Down Expand Up @@ -524,7 +561,7 @@ impl CloudOptions {
CloudType::Aws => {
#[cfg(feature = "aws")]
{
parsed_untyped_config::<AmazonS3ConfigKey, _>(config)
parse_untyped_config::<AmazonS3ConfigKey, _>(config)
.map(|aws| Self::default().with_aws(aws))
}
#[cfg(not(feature = "aws"))]
Expand All @@ -535,7 +572,7 @@ impl CloudOptions {
CloudType::Azure => {
#[cfg(feature = "azure")]
{
parsed_untyped_config::<AzureConfigKey, _>(config)
parse_untyped_config::<AzureConfigKey, _>(config)
.map(|azure| Self::default().with_azure(azure))
}
#[cfg(not(feature = "azure"))]
Expand All @@ -548,7 +585,7 @@ impl CloudOptions {
CloudType::Gcp => {
#[cfg(feature = "gcp")]
{
parsed_untyped_config::<GoogleConfigKey, _>(config)
parse_untyped_config::<GoogleConfigKey, _>(config)
.map(|gcp| Self::default().with_gcp(gcp))
}
#[cfg(not(feature = "gcp"))]
Expand Down Expand Up @@ -644,7 +681,7 @@ impl CloudOptions {
mod tests {
use hashbrown::HashMap;

use super::{parse_url, parsed_untyped_config};
use super::{parse_untyped_config, parse_url};

#[test]
fn test_parse_url() {
Expand Down Expand Up @@ -730,7 +767,7 @@ mod tests {
]
.into_iter()
.collect::<HashMap<_, _>>();
let aws_keys = parsed_untyped_config::<AmazonS3ConfigKey, _>(aws_config)
let aws_keys = parse_untyped_config::<AmazonS3ConfigKey, _>(aws_config)
.expect("Parsing keys shouldn't have thrown an error");

assert_eq!(
Expand All @@ -745,7 +782,7 @@ mod tests {
]
.into_iter()
.collect::<HashMap<_, _>>();
let aws_keys = parsed_untyped_config::<AmazonS3ConfigKey, _>(aws_config)
let aws_keys = parse_untyped_config::<AmazonS3ConfigKey, _>(aws_config)
.expect("Parsing keys shouldn't have thrown an error");

assert_eq!(
Expand Down
5 changes: 3 additions & 2 deletions py-polars/polars/_utils/logging.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import os
import sys
from functools import partial
from typing import Any


def verbose() -> bool:
return os.getenv("POLARS_VERBOSE") == "1"


eprint = partial(print, file=sys.stderr)
def eprint(*a: Any, **kw: Any) -> None:
return print(*a, file=sys.stderr, **kw)
Copy link
Copy Markdown
Collaborator Author

@nameexhaustion nameexhaustion May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drive-by, ensure eprint does not capture sys.stderr upon import, this caused pytest capfd to not work as it presumably relies on patching sys.stderr (pytest-dev/pytest#5997)

65 changes: 16 additions & 49 deletions py-polars/polars/io/cloud/credential_provider/_builder.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from __future__ import annotations

import abc
from functools import partial
from typing import TYPE_CHECKING, Any, Callable, Literal
from typing import TYPE_CHECKING, Any, Literal

import polars._utils.logging
from polars._utils.logging import eprint, verbose
Expand Down Expand Up @@ -178,43 +177,6 @@ def provider_repr(self) -> str:
return self.cls.__name__


# AWS auto-init needs its own class for a bit of extra logic.
class AutoInitAWS(CredentialProviderBuilderImpl):
def __init__(
self,
initializer: Callable[[], CredentialProviderAWS],
) -> None:
self.initializer = initializer
self.profile_name = initializer.keywords["profile_name"] # type: ignore[attr-defined]

def __call__(self) -> CredentialProviderAWS | None:
try:
provider = self.initializer()
provider() # call it to potentially catch EmptyCredentialError

except (ImportError, CredentialProviderAWS.EmptyCredentialError) as e:
# Check it is ImportError, EmptyCredentialError could be because the
# profile was loaded but did not contain any credentials.
if isinstance(e, ImportError) and self.profile_name:
# Hard error as we are unable to load the requested profile
# without CredentialProviderAWS (the rust-side does not load
# aws_profile).
msg = f"cannot load requested aws_profile '{self.profile_name}': {e!r}"
raise polars.exceptions.ComputeError(msg) from e

if verbose():
eprint(f"failed to auto-initialize {self.provider_repr}: {e!r}")

else:
return provider

return None

@property
def provider_repr(self) -> str:
return "CredentialProviderAWS"


class UserProvidedGCPToken(CredentialProvider):
"""User-provided GCP token in storage_options."""

Expand Down Expand Up @@ -318,6 +280,7 @@ def f() -> CredentialProviderBuilder | None:
profile = None
default_region = None
unhandled_key = None
has_endpoint_url = False

if storage_options is not None:
for k, v in storage_options.items():
Expand All @@ -330,11 +293,17 @@ def f() -> CredentialProviderBuilder | None:
default_region = v
elif k in {"aws_profile", "profile"}:
profile = v
elif k in {
"aws_endpoint",
"aws_endpoint_url",
"endpoint",
"endpoint_url",
}:
has_endpoint_url = True
elif k in OBJECT_STORE_CLIENT_OPTIONS:
continue
else:
# We assume some sort of access key was given, so we
# just dispatch to the rust side.
# We assume this is some sort of access key
unhandled_key = k

if unhandled_key is not None:
Expand All @@ -345,15 +314,13 @@ def f() -> CredentialProviderBuilder | None:
)
raise ValueError(msg)

return None

return CredentialProviderBuilder(
AutoInitAWS(
partial(
CredentialProviderAWS,
profile_name=profile,
region_name=region or default_region,
)
AutoInit(
CredentialProviderAWS,
profile_name=profile,
region_name=region or default_region,
_auto_init_unhandled_key=unhandled_key,
_storage_options_has_endpoint_url=has_endpoint_url,
)
)

Expand Down
Loading
Loading