Skip to content

Commit

Permalink
c
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Jan 10, 2025
1 parent 2e0b3a3 commit b4ac1af
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 101 deletions.
94 changes: 79 additions & 15 deletions crates/polars-io/src/cloud/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,9 @@ impl CloudOptions {
/// Build the [`object_store::ObjectStore`] implementation for Azure.
#[cfg(feature = "azure")]
pub fn build_azure(&self, url: &str) -> PolarsResult<impl object_store::ObjectStore> {
use futures::TryStreamExt;
use object_store::ObjectStore;

use super::credential_provider::IntoCredentialProvider;

let verbose = polars_core::config::verbose();
Expand All @@ -424,6 +427,14 @@ impl CloudOptions {
.with_url(url)
.with_retry(get_retry_config(self.max_retries));

let opt_account_key = extract_adls_uri_storage_account(url) // Prefer the one embedded in the path
.map(|x| x.into())
.or(storage_account)
.as_deref()
.and_then(get_azure_storage_account_key);

let builder_copy = opt_account_key.is_some().then(|| builder.clone());

let builder = if let Some(v) = self.credential_provider.clone() {
if verbose {
eprintln!(
Expand All @@ -432,26 +443,79 @@ impl CloudOptions {
);
}
builder.with_credentials(v.into_azure_provider())
} else if let Some(v) = extract_adls_uri_storage_account(url) // Prefer the one embedded in the path
.map(|x| x.into())
.or(storage_account)
.as_deref()
.and_then(get_azure_storage_account_key)
{
if verbose {
eprintln!("[CloudOptions::build_azure]: Retrieved account key from Azure CLI")
}
builder.with_access_key(v)
} else {
builder
};

if verbose {
eprintln!(
"[CloudOptions::build_azure]: opt_account_key: {:?}",
opt_account_key.as_ref().map(|_| "<redacted>")
);
}

let store = builder.build().map_err(to_compute_err)?;

let Some(account_key) = opt_account_key else {
return Ok(store);
};

// We do `list()` instead of `head(path)` as it could be a glob path.
let success = crate::pl_async::get_runtime()
.block_on_potential_spawn(async { store.list(None).try_next().await.is_ok() });

if success {
if verbose {
eprintln!("[CloudOptions::build_azure]: Success on list() without account key");
}
return Ok(store);
}

let cfg_use_account_key = std::env::var("POLARS_ALLOW_AZURE_ACCOUNT_KEY");
let cfg_use_account_key = cfg_use_account_key.as_deref();

if verbose {
eprintln!(
"[CloudOptions::build_azure]: Err on list(), cfg_use_account_key={:?}",
cfg_use_account_key
);
}

if cfg_use_account_key != Ok("1") {
use std::sync::Once;
static HINT: Once = Once::new();

if cfg_use_account_key != Ok("0") {
HINT.call_once(|| {
eprintln!(
"
[CloudOptions::build_azure]: Warning: Azure authentication check failed, \
subsequent cloud operations may return an error. Polars was able to retrieve \
the storage account key for this URL - to allow polars to use storage account \
keys for authentication, set POLARS_ALLOW_AZURE_ACCOUNT_KEY=1 in the \
environment. To silence this warning, set POLARS_ALLOW_AZURE_ACCOUNT_KEY=0.
"
)
});
} else if verbose {
eprintln!(
"[CloudOptions::build_azure]: Could not retrieve account key from Azure CLI"
)
"[CloudOptions::build_azure]: Storage account key authentication disabled \
(POLARS_ALLOW_AZURE_ACCOUNT_KEY=0)"
);
}
builder
};

builder.build().map_err(to_compute_err)
return Ok(store);
}

if verbose {
eprintln!("[CloudOptions::build_azure]: Using storage account key authentication");
}

builder_copy
.unwrap()
.with_access_key(account_key)
.build()
.map_err(to_compute_err)
}

/// Set the configuration for GCP connections. This is the preferred API from rust.
Expand Down
92 changes: 6 additions & 86 deletions py-polars/polars/io/cloud/credential_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ def __init__(
self,
*,
scopes: list[str] | None = None,
storage_account: str | None = None,
tenant_id: str | None = None,
_verbose: bool = False,
) -> None:
Expand All @@ -169,11 +168,6 @@ def __init__(
----------
scopes
Scopes to pass to `get_token`
storage_account
If specified, an attempt will be made to retrieve the account keys
for this account using the Azure CLI. If this is successful, the
account keys will be used instead of
`DefaultAzureCredential.get_token()`
tenant_id
Azure tenant ID.
"""
Expand All @@ -182,7 +176,6 @@ def __init__(

self._check_module_availability()

self.account_name = storage_account
self.tenant_id = tenant_id
# Done like this to bypass mypy, we don't have stubs for azure.identity
self.credential = importlib.import_module("azure.identity").__dict__[
Expand All @@ -195,7 +188,6 @@ def __init__(
print(
(
"CredentialProviderAzure "
f"{self.account_name = } "
f"{self.tenant_id = } "
f"{self.scopes = } "
),
Expand All @@ -204,28 +196,6 @@ def __init__(

def __call__(self) -> CredentialProviderFunctionReturn:
"""Fetch the credentials."""
if self.account_name is not None:
try:
creds = {
"account_key": self._get_azure_storage_account_key_az_cli(
self.account_name
)
}

if self._verbose:
print(
"[CredentialProviderAzure]: Retrieved account key from Azure CLI",
file=sys.stderr,
)
except Exception as e:
if self._verbose:
print(
f"[CredentialProviderAzure]: Could not retrieve account key from Azure CLI: {e}",
file=sys.stderr,
)
else:
return creds, None # type: ignore[return-value]

token = self.credential.get_token(*self.scopes, tenant_id=self.tenant_id)

return {
Expand All @@ -238,51 +208,6 @@ def _check_module_availability(cls) -> None:
msg = "azure-identity must be installed to use `CredentialProviderAzure`"
raise ImportError(msg)

@staticmethod
def _extract_adls_uri_storage_account(uri: str) -> str | None:
# "abfss://{CONTAINER}@{STORAGE_ACCOUNT}.dfs.core.windows.net/"
# ^^^^^^^^^^^^^^^^^
try:
return (
uri.split("://", 1)[1]
.split("/", 1)[0]
.split("@", 1)[1]
.split(".dfs.core.windows.net", 1)[0]
)

except IndexError:
return None

@classmethod
def _get_azure_storage_account_key_az_cli(cls, account_name: str) -> str:
# [
# {
# "creationTime": "1970-01-01T00:00:00.000000+00:00",
# "keyName": "key1",
# "permissions": "FULL",
# "value": "..."
# },
# {
# "creationTime": "1970-01-01T00:00:00.000000+00:00",
# "keyName": "key2",
# "permissions": "FULL",
# "value": "..."
# }
# ]

return json.loads(
cls._azcli(
"storage",
"account",
"keys",
"list",
"--output",
"json",
"--account-name",
account_name,
)
)[0]["value"]

@classmethod
def _azcli_version(cls) -> str | None:
try:
Expand Down Expand Up @@ -421,7 +346,6 @@ def _maybe_init_credential_provider(
# For Azure we dispatch to `azure.identity` as much as possible
if _is_azure_cloud(scheme):
tenant_id = None
storage_account = None

if storage_options is not None:
for k, v in storage_options.items():
Expand All @@ -435,23 +359,19 @@ def _maybe_init_credential_provider(
"authority_id",
}:
tenant_id = v
elif k in {"azure_storage_account_name", "account_name"}:
storage_account = v
elif k in {"azure_use_azure_cli", "use_azure_cli"}:
elif k in {
"azure_storage_account_name",
"account_name",
"azure_use_azure_cli",
"use_azure_cli",
}:
continue
else:
# We assume some sort of access key was given, so we
# just dispatch to the rust side.
return None

storage_account = (
# Prefer the one embedded in the path
CredentialProviderAzure._extract_adls_uri_storage_account(str(path))
or storage_account
)

provider = CredentialProviderAzure(
storage_account=storage_account,
tenant_id=tenant_id,
_verbose=verbose,
)
Expand Down

0 comments on commit b4ac1af

Please sign in to comment.