dabla commented on code in PR #62867:
URL: https://github.com/apache/airflow/pull/62867#discussion_r3008284313
##########
providers/common/sql/src/airflow/providers/common/sql/datafusion/object_storage_provider.py:
##########
@@ -70,12 +70,126 @@ def get_scheme(self) -> str:
return "file://"
+class GCSObjectStorageProvider(ObjectStorageProvider):
Review Comment:
Here the same, that class should be part of the google provider, **NOT** of
the common-sql provider.
WDYT of this @potiuk ?
##########
providers/common/sql/src/airflow/providers/common/sql/datafusion/object_storage_provider.py:
##########
@@ -70,12 +70,126 @@ def get_scheme(self) -> str:
return "file://"
+class GCSObjectStorageProvider(ObjectStorageProvider):
+ """GCS Object Storage Provider using DataFusion's GoogleCloud."""
+
+ @property
+ def get_storage_type(self) -> StorageType:
+ """Return the storage type."""
+ return StorageType.GCS
+
+ def create_object_store(self, path: str, connection_config:
ConnectionConfig | None = None):
+ """
+ Create a GCS object store using DataFusion's GoogleCloud.
+
+ Supported auth modes (in priority order):
+
+ - ``key_path`` / ``keyfile_dict``: resolved to a file path via
+ ``provide_gcp_credential_file_as_context`` and passed as
``service_account_path``.
+ - ``credential_config_file``: passed as ``service_account_path``. If
the value is an
+ inline JSON string or dict it is written to a temporary file first.
+ - No credentials set: DataFusion falls back to Application Default
Credentials (ADC).
+
+ Not supported: ``key_secret_name`` (Secret Manager) and
``impersonation_chain`` — these
+ require capabilities outside DataFusion's ``GoogleCloud`` object-store
API.
+ """
+ if connection_config is None:
+ raise ValueError(f"connection_config must be provided for
{self.get_storage_type}")
+
+ try:
+ import json
+ import os
+ import tempfile
+
+ from airflow.providers.google.common.hooks.base_google import
GoogleBaseHook, get_field
+
+ bucket = self.get_bucket(path)
+ gcp_hook = GoogleBaseHook(gcp_conn_id=connection_config.conn_id)
+
+ if get_field(gcp_hook.extras, "key_secret_name"):
+ raise ValueError(
+ "GCS auth mode 'key_secret_name' (Secret Manager) is not
supported by the "
+ "DataFusion object store. Use 'key_path' or 'keyfile_dict'
instead."
+ )
+
+ with gcp_hook.provide_gcp_credential_file_as_context() as
cred_file:
+ if cred_file is not None:
+ # key_path or keyfile_dict
+ gcs_store = GoogleCloud(service_account_path=cred_file,
bucket_name=bucket)
+ else:
+ credential_config_file = get_field(gcp_hook.extras,
"credential_config_file")
+ if credential_config_file is not None:
+ # Workload Identity Federation via
credential_config_file
+ if isinstance(credential_config_file, str) and
os.path.exists(credential_config_file):
+ gcs_store = GoogleCloud(
+ service_account_path=credential_config_file,
+ bucket_name=bucket,
+ )
+ else:
+ # Inline dict or JSON string — write to temp file
+ with tempfile.NamedTemporaryFile(mode="w",
suffix=".json") as tmp:
+ content = (
+ json.dumps(credential_config_file)
+ if isinstance(credential_config_file, dict)
+ else credential_config_file
+ )
+ tmp.write(content)
+ tmp.flush()
+ gcs_store =
GoogleCloud(service_account_path=tmp.name, bucket_name=bucket)
+ else:
+ # ADC fallback
+ gcs_store = GoogleCloud(bucket_name=bucket)
+
+ self.log.info("Created GCS object store for bucket %s", bucket)
+ return gcs_store
+
+ except Exception as e:
+ raise ObjectStoreCreationException(f"Failed to create GCS object
store: {e}")
+
+ def get_scheme(self) -> str:
+ """Return the scheme for GCS."""
+ return "gs://"
+
+
+class AzureObjectStorageProvider(ObjectStorageProvider):
Review Comment:
Same here, this should be part of the Microsoft Azure provider.
##########
providers/common/sql/src/airflow/providers/common/sql/datafusion/engine.py:
##########
@@ -169,6 +158,65 @@ def _fetch_extra_configs(keys: list[str]) -> dict[str,
Any]:
credentials = self._remove_none_values(credentials)
extra_config = _fetch_extra_configs(["region", "endpoint"])
+ case "google_cloud_platform":
+ try:
+ # Imported as a feature gate only: verifies the Google
provider is installed.
+ from airflow.providers.google.common.hooks.base_google
import GoogleBaseHook # noqa: F401
+ except ImportError:
+ from airflow.providers.common.compat.sdk import
AirflowOptionalProviderFeatureException
Review Comment:
Not related to this PR, but I don't like this implementation to be honest.
As here multiple dependencies are being made between providers (e.g.
google/aws/...), and thus we made the common.sql provider indirectly tied to
other providers, even though this only happens at runtime.
I think here we should do a more cleaner approach like I did with the sql
dialects. We should have a similar mechanism in which the provider should
registers itself as a datafusion capable provider, even though here it's only
for credentials, I think it would be nicer if this wouldn't be hardcode here
with an if/else case structure.
The SQL dialects for example register themselves per provider during
provider registration, so that the common sql provider can lookup the dialect
dynamically, common-sql has no knowledge of any specific provider regarding
dialects. I would be nice to have the same here, instead of the case structure.
##########
providers/common/sql/tests/unit/common/sql/datafusion/test_engine.py:
##########
@@ -289,6 +252,155 @@ def test_get_credentials_unknown_type(self):
with pytest.raises(ValueError, match="Unknown connection type dummy"):
engine._get_credentials(mock_conn)
+ def test_get_credentials_google_cloud_platform(self):
Review Comment:
Tests should then also move accordingly to their corresponding provider.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]