dabla commented on code in PR #62867:
URL: https://github.com/apache/airflow/pull/62867#discussion_r3008284313


##########
providers/common/sql/src/airflow/providers/common/sql/datafusion/object_storage_provider.py:
##########
@@ -70,12 +70,126 @@ def get_scheme(self) -> str:
         return "file://"
 
 
+class GCSObjectStorageProvider(ObjectStorageProvider):

Review Comment:
   Here the same, that class should be part of the google provider, **NOT** of 
the common-sql provider.
   
   WDYT of this @potiuk ?



##########
providers/common/sql/src/airflow/providers/common/sql/datafusion/object_storage_provider.py:
##########
@@ -70,12 +70,126 @@ def get_scheme(self) -> str:
         return "file://"
 
 
+class GCSObjectStorageProvider(ObjectStorageProvider):
+    """GCS Object Storage Provider using DataFusion's GoogleCloud."""
+
+    @property
+    def get_storage_type(self) -> StorageType:
+        """Return the storage type."""
+        return StorageType.GCS
+
+    def create_object_store(self, path: str, connection_config: 
ConnectionConfig | None = None):
+        """
+        Create a GCS object store using DataFusion's GoogleCloud.
+
+        Supported auth modes (in priority order):
+
+        - ``key_path`` / ``keyfile_dict``: resolved to a file path via
+          ``provide_gcp_credential_file_as_context`` and passed as 
``service_account_path``.
+        - ``credential_config_file``: passed as ``service_account_path``. If 
the value is an
+          inline JSON string or dict it is written to a temporary file first.
+        - No credentials set: DataFusion falls back to Application Default 
Credentials (ADC).
+
+        Not supported: ``key_secret_name`` (Secret Manager) and 
``impersonation_chain`` — these
+        require capabilities outside DataFusion's ``GoogleCloud`` object-store 
API.
+        """
+        if connection_config is None:
+            raise ValueError(f"connection_config must be provided for 
{self.get_storage_type}")
+
+        try:
+            import json
+            import os
+            import tempfile
+
+            from airflow.providers.google.common.hooks.base_google import 
GoogleBaseHook, get_field
+
+            bucket = self.get_bucket(path)
+            gcp_hook = GoogleBaseHook(gcp_conn_id=connection_config.conn_id)
+
+            if get_field(gcp_hook.extras, "key_secret_name"):
+                raise ValueError(
+                    "GCS auth mode 'key_secret_name' (Secret Manager) is not 
supported by the "
+                    "DataFusion object store. Use 'key_path' or 'keyfile_dict' 
instead."
+                )
+
+            with gcp_hook.provide_gcp_credential_file_as_context() as 
cred_file:
+                if cred_file is not None:
+                    # key_path or keyfile_dict
+                    gcs_store = GoogleCloud(service_account_path=cred_file, 
bucket_name=bucket)
+                else:
+                    credential_config_file = get_field(gcp_hook.extras, 
"credential_config_file")
+                    if credential_config_file is not None:
+                        # Workload Identity Federation via 
credential_config_file
+                        if isinstance(credential_config_file, str) and 
os.path.exists(credential_config_file):
+                            gcs_store = GoogleCloud(
+                                service_account_path=credential_config_file,
+                                bucket_name=bucket,
+                            )
+                        else:
+                            # Inline dict or JSON string — write to temp file
+                            with tempfile.NamedTemporaryFile(mode="w", 
suffix=".json") as tmp:
+                                content = (
+                                    json.dumps(credential_config_file)
+                                    if isinstance(credential_config_file, dict)
+                                    else credential_config_file
+                                )
+                                tmp.write(content)
+                                tmp.flush()
+                                gcs_store = 
GoogleCloud(service_account_path=tmp.name, bucket_name=bucket)
+                    else:
+                        # ADC fallback
+                        gcs_store = GoogleCloud(bucket_name=bucket)
+
+                self.log.info("Created GCS object store for bucket %s", bucket)
+                return gcs_store
+
+        except Exception as e:
+            raise ObjectStoreCreationException(f"Failed to create GCS object 
store: {e}")
+
+    def get_scheme(self) -> str:
+        """Return the scheme for GCS."""
+        return "gs://"
+
+
+class AzureObjectStorageProvider(ObjectStorageProvider):

Review Comment:
   Same here, this should be part of the Microsoft Azure provider.



##########
providers/common/sql/src/airflow/providers/common/sql/datafusion/engine.py:
##########
@@ -169,6 +158,65 @@ def _fetch_extra_configs(keys: list[str]) -> dict[str, 
Any]:
                 credentials = self._remove_none_values(credentials)
                 extra_config = _fetch_extra_configs(["region", "endpoint"])
 
+            case "google_cloud_platform":
+                try:
+                    # Imported as a feature gate only: verifies the Google 
provider is installed.
+                    from airflow.providers.google.common.hooks.base_google 
import GoogleBaseHook  # noqa: F401
+                except ImportError:
+                    from airflow.providers.common.compat.sdk import 
AirflowOptionalProviderFeatureException

Review Comment:
   Not related to this PR, but I don't like this implementation to be honest.  
As here multiple dependencies are being made between providers (e.g. 
google/aws/...), and thus we made the common.sql provider indirectly tied to 
other providers, even though this only happens at runtime.
   
   I think here we should do a more cleaner approach like I did with the sql 
dialects.  We should have a similar mechanism in which the provider should 
registers itself as a datafusion capable provider, even though here it's only 
for credentials, I think it would be nicer if this wouldn't be hardcode here 
with an if/else case structure.
   
   The SQL dialects for example register themselves per provider during 
provider registration, so that the common sql provider can lookup the dialect 
dynamically, common-sql has no knowledge of any specific provider regarding 
dialects.  I would be nice to have the same here, instead of the case structure.



##########
providers/common/sql/tests/unit/common/sql/datafusion/test_engine.py:
##########
@@ -289,6 +252,155 @@ def test_get_credentials_unknown_type(self):
         with pytest.raises(ValueError, match="Unknown connection type dummy"):
             engine._get_credentials(mock_conn)
 
+    def test_get_credentials_google_cloud_platform(self):

Review Comment:
   Tests should then also move accordingly to their corresponding provider.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to