jiakai-li commented on code in PR #1453:
URL: https://github.com/apache/iceberg-python/pull/1453#discussion_r1903376358


##########
pyiceberg/io/pyarrow.py:
##########
@@ -351,76 +344,146 @@ def parse_location(location: str) -> Tuple[str, str, 
str]:
             return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"
 
     def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> 
FileSystem:
-        if scheme in {"s3", "s3a", "s3n", "oss"}:
-            from pyarrow.fs import S3FileSystem
+        """Initialize FileSystem for different scheme."""
+        if scheme in {"oss"}:
+            return self._initialize_oss_fs(scheme, netloc)
 
-            client_kwargs: Dict[str, Any] = {
-                "endpoint_override": self.properties.get(S3_ENDPOINT),
-                "access_key": get_first_property_value(self.properties, 
S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
-                "secret_key": get_first_property_value(self.properties, 
S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
-                "session_token": get_first_property_value(self.properties, 
S3_SESSION_TOKEN, AWS_SESSION_TOKEN),
-                "region": get_first_property_value(self.properties, S3_REGION, 
AWS_REGION),
-            }
+        elif scheme in {"s3", "s3a", "s3n"}:
+            return self._initialize_s3_fs(scheme, netloc)
 
-            if proxy_uri := self.properties.get(S3_PROXY_URI):
-                client_kwargs["proxy_options"] = proxy_uri
+        elif scheme in ("hdfs", "viewfs"):
+            return self._initialize_hdfs_fs(scheme, netloc)
 
-            if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT):
-                client_kwargs["connect_timeout"] = float(connect_timeout)
+        elif scheme in {"gs", "gcs"}:
+            return self._initialize_gcs_fs(scheme, netloc)
 
-            if role_arn := get_first_property_value(self.properties, 
S3_ROLE_ARN, AWS_ROLE_ARN):
-                client_kwargs["role_arn"] = role_arn
+        elif scheme in {"file"}:
+            return self._initialize_local_fs(scheme, netloc)
 
-            if session_name := get_first_property_value(self.properties, 
S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME):
-                client_kwargs["session_name"] = session_name
+        else:
+            raise ValueError(f"Unrecognized filesystem type in URI: {scheme}")
 
-            if force_virtual_addressing := 
self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING):
-                client_kwargs["force_virtual_addressing"] = 
property_as_bool(self.properties, force_virtual_addressing, False)
+    def _initialize_oss_fs(self, scheme: str, netloc: Optional[str]) -> 
FileSystem:
+        from pyarrow.fs import S3FileSystem
 
-            return S3FileSystem(**client_kwargs)
-        elif scheme in ("hdfs", "viewfs"):
-            from pyarrow.fs import HadoopFileSystem
-
-            hdfs_kwargs: Dict[str, Any] = {}
-            if netloc:
-                return HadoopFileSystem.from_uri(f"{scheme}://{netloc}")
-            if host := self.properties.get(HDFS_HOST):
-                hdfs_kwargs["host"] = host
-            if port := self.properties.get(HDFS_PORT):
-                # port should be an integer type
-                hdfs_kwargs["port"] = int(port)
-            if user := self.properties.get(HDFS_USER):
-                hdfs_kwargs["user"] = user
-            if kerb_ticket := self.properties.get(HDFS_KERB_TICKET):
-                hdfs_kwargs["kerb_ticket"] = kerb_ticket
-
-            return HadoopFileSystem(**hdfs_kwargs)
-        elif scheme in {"gs", "gcs"}:
-            from pyarrow.fs import GcsFileSystem
-
-            gcs_kwargs: Dict[str, Any] = {}
-            if access_token := self.properties.get(GCS_TOKEN):
-                gcs_kwargs["access_token"] = access_token
-            if expiration := self.properties.get(GCS_TOKEN_EXPIRES_AT_MS):
-                gcs_kwargs["credential_token_expiration"] = 
millis_to_datetime(int(expiration))
-            if bucket_location := self.properties.get(GCS_DEFAULT_LOCATION):
-                gcs_kwargs["default_bucket_location"] = bucket_location
-            if endpoint := get_first_property_value(self.properties, 
GCS_SERVICE_HOST, GCS_ENDPOINT):
-                if self.properties.get(GCS_ENDPOINT):
-                    deprecation_message(
-                        deprecated_in="0.8.0",
-                        removed_in="0.9.0",
-                        help_message=f"The property {GCS_ENDPOINT} is 
deprecated, please use {GCS_SERVICE_HOST} instead",
-                    )
-                url_parts = urlparse(endpoint)
-                gcs_kwargs["scheme"] = url_parts.scheme
-                gcs_kwargs["endpoint_override"] = url_parts.netloc
+        client_kwargs: Dict[str, Any] = {
+            "endpoint_override": self.properties.get(S3_ENDPOINT),
+            "access_key": get_first_property_value(self.properties, 
S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
+            "secret_key": get_first_property_value(self.properties, 
S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
+            "session_token": get_first_property_value(self.properties, 
S3_SESSION_TOKEN, AWS_SESSION_TOKEN),
+            "region": get_first_property_value(self.properties, S3_REGION, 
AWS_REGION),
+        }
 
-            return GcsFileSystem(**gcs_kwargs)
-        elif scheme == "file":
-            return PyArrowLocalFileSystem()
-        else:
-            raise ValueError(f"Unrecognized filesystem type in URI: {scheme}")
+        if proxy_uri := self.properties.get(S3_PROXY_URI):
+            client_kwargs["proxy_options"] = proxy_uri
+
+        if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT):
+            client_kwargs["connect_timeout"] = float(connect_timeout)
+
+        if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, 
AWS_ROLE_ARN):
+            client_kwargs["role_arn"] = role_arn
+
+        if session_name := get_first_property_value(self.properties, 
S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME):
+            client_kwargs["session_name"] = session_name
+
+        if force_virtual_addressing := 
self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING):
+            client_kwargs["force_virtual_addressing"] = 
property_as_bool(self.properties, force_virtual_addressing, False)
+
+        return S3FileSystem(**client_kwargs)
+
+    def _initialize_s3_fs(self, scheme: str, netloc: Optional[str]) -> 
FileSystem:
+        from pyarrow.fs import S3FileSystem, resolve_s3_region
+
+        # Resolve region from netloc(bucket), fallback to user-provided region
+        provided_region = get_first_property_value(self.properties, S3_REGION, 
AWS_REGION)
+
+        try:
+            bucket_region = resolve_s3_region(netloc)
+        except (OSError, TypeError):
+            bucket_region = None
+            logger.warning(f"Unable to resolve region for bucket {netloc}, 
using default region {provided_region}")
+
+        bucket_region = bucket_region or provided_region
+        if bucket_region != provided_region:
+            logger.warning(
+                f"PyArrow FileIO overriding S3 bucket region for bucket 
{netloc}: "
+                f"provided region {provided_region}, actual region 
{bucket_region}"
+            )
+
+        client_kwargs: Dict[str, Any] = {
+            "endpoint_override": self.properties.get(S3_ENDPOINT),
+            "access_key": get_first_property_value(self.properties, 
S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
+            "secret_key": get_first_property_value(self.properties, 
S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
+            "session_token": get_first_property_value(self.properties, 
S3_SESSION_TOKEN, AWS_SESSION_TOKEN),
+            "region": bucket_region,
+        }
+
+        if proxy_uri := self.properties.get(S3_PROXY_URI):
+            client_kwargs["proxy_options"] = proxy_uri
+
+        if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT):
+            client_kwargs["connect_timeout"] = float(connect_timeout)
+
+        if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, 
AWS_ROLE_ARN):
+            client_kwargs["role_arn"] = role_arn
+
+        if session_name := get_first_property_value(self.properties, 
S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME):
+            client_kwargs["session_name"] = session_name
+
+        if force_virtual_addressing := 
self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING):
+            client_kwargs["force_virtual_addressing"] = 
property_as_bool(self.properties, force_virtual_addressing, False)
+
+        return S3FileSystem(**client_kwargs)
+
+    def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> 
FileSystem:
+        from pyarrow.fs import HadoopFileSystem
+
+        hdfs_kwargs: Dict[str, Any] = {}
+        if netloc:
+            return HadoopFileSystem.from_uri(f"{scheme}://{netloc}")
+        if host := self.properties.get(HDFS_HOST):
+            hdfs_kwargs["host"] = host
+        if port := self.properties.get(HDFS_PORT):
+            # port should be an integer type
+            hdfs_kwargs["port"] = int(port)
+        if user := self.properties.get(HDFS_USER):
+            hdfs_kwargs["user"] = user
+        if kerb_ticket := self.properties.get(HDFS_KERB_TICKET):
+            hdfs_kwargs["kerb_ticket"] = kerb_ticket
+
+        return HadoopFileSystem(**hdfs_kwargs)
+
+    def _initialize_gcs_fs(self, scheme: str, netloc: Optional[str]) -> 
FileSystem:
+        from pyarrow.fs import GcsFileSystem
+
+        gcs_kwargs: Dict[str, Any] = {}
+        if access_token := self.properties.get(GCS_TOKEN):
+            gcs_kwargs["access_token"] = access_token
+        if expiration := self.properties.get(GCS_TOKEN_EXPIRES_AT_MS):
+            gcs_kwargs["credential_token_expiration"] = 
millis_to_datetime(int(expiration))
+        if bucket_location := self.properties.get(GCS_DEFAULT_LOCATION):
+            gcs_kwargs["default_bucket_location"] = bucket_location
+        if endpoint := get_first_property_value(self.properties, 
GCS_SERVICE_HOST, GCS_ENDPOINT):
+            if self.properties.get(GCS_ENDPOINT):
+                deprecation_message(
+                    deprecated_in="0.8.0",
+                    removed_in="0.9.0",
+                    help_message=f"The property {GCS_ENDPOINT} is deprecated, 
please use {GCS_SERVICE_HOST} instead",
+                )

Review Comment:
   Thank you Fokko, do you mean to remove the `deprecation_message` or the 
`GCS_ENDPOINT` property? It says this option will be removed in 0.9.0, is it ok 
if we remove it now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to