kevinjqliu commented on code in PR #1453: URL: https://github.com/apache/iceberg-python/pull/1453#discussion_r1903446304
########## pyiceberg/io/pyarrow.py: ########## @@ -351,77 +351,141 @@ def parse_location(location: str) -> Tuple[str, str, str]: return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}" def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem: - if scheme in {"s3", "s3a", "s3n", "oss"}: - from pyarrow.fs import S3FileSystem - - client_kwargs: Dict[str, Any] = { - "endpoint_override": self.properties.get(S3_ENDPOINT), - "access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), - "secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), - "session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN), - "region": get_first_property_value(self.properties, S3_REGION, AWS_REGION), - } - - if proxy_uri := self.properties.get(S3_PROXY_URI): - client_kwargs["proxy_options"] = proxy_uri - - if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT): - client_kwargs["connect_timeout"] = float(connect_timeout) - - if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN): - client_kwargs["role_arn"] = role_arn - - if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME): - client_kwargs["session_name"] = session_name - - if force_virtual_addressing := self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING): - client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, force_virtual_addressing, False) - - return S3FileSystem(**client_kwargs) - elif scheme in ("hdfs", "viewfs"): - from pyarrow.fs import HadoopFileSystem - - hdfs_kwargs: Dict[str, Any] = {} - if netloc: - return HadoopFileSystem.from_uri(f"{scheme}://{netloc}") - if host := self.properties.get(HDFS_HOST): - hdfs_kwargs["host"] = host - if port := self.properties.get(HDFS_PORT): - # port should be an integer type - hdfs_kwargs["port"] = int(port) - if user := self.properties.get(HDFS_USER): - hdfs_kwargs["user"] = user - if kerb_ticket := self.properties.get(HDFS_KERB_TICKET): - hdfs_kwargs["kerb_ticket"] = kerb_ticket - - return HadoopFileSystem(**hdfs_kwargs) + """Initialize FileSystem for different scheme.""" + if scheme in {"oss"}: + return self._initialize_oss_fs() + + elif scheme in {"s3", "s3a", "s3n"}: + return self._initialize_s3_fs(netloc) + + elif scheme in {"hdfs", "viewfs"}: + return self._initialize_hdfs_fs(scheme, netloc) + elif scheme in {"gs", "gcs"}: - from pyarrow.fs import GcsFileSystem - - gcs_kwargs: Dict[str, Any] = {} - if access_token := self.properties.get(GCS_TOKEN): - gcs_kwargs["access_token"] = access_token - if expiration := self.properties.get(GCS_TOKEN_EXPIRES_AT_MS): - gcs_kwargs["credential_token_expiration"] = millis_to_datetime(int(expiration)) - if bucket_location := self.properties.get(GCS_DEFAULT_LOCATION): - gcs_kwargs["default_bucket_location"] = bucket_location - if endpoint := get_first_property_value(self.properties, GCS_SERVICE_HOST, GCS_ENDPOINT): - if self.properties.get(GCS_ENDPOINT): - deprecation_message( - deprecated_in="0.8.0", - removed_in="0.9.0", - help_message=f"The property {GCS_ENDPOINT} is deprecated, please use {GCS_SERVICE_HOST} instead", - ) - url_parts = urlparse(endpoint) - gcs_kwargs["scheme"] = url_parts.scheme - gcs_kwargs["endpoint_override"] = url_parts.netloc + return self._initialize_gcs_fs() + + elif scheme in {"file"}: + return self._initialize_local_fs() - return GcsFileSystem(**gcs_kwargs) - elif scheme == "file": - return PyArrowLocalFileSystem() else: raise ValueError(f"Unrecognized filesystem type in URI: {scheme}") + def _initialize_oss_fs(self) -> FileSystem: + from pyarrow.fs import S3FileSystem + + client_kwargs: Dict[str, Any] = { + "endpoint_override": self.properties.get(S3_ENDPOINT), + "access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), + "secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), + "session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN), + "region": get_first_property_value(self.properties, S3_REGION, AWS_REGION), + } + + if proxy_uri := self.properties.get(S3_PROXY_URI): + client_kwargs["proxy_options"] = proxy_uri + + if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT): + client_kwargs["connect_timeout"] = float(connect_timeout) + + if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN): + client_kwargs["role_arn"] = role_arn + + if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME): + client_kwargs["session_name"] = session_name + + if force_virtual_addressing := self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING): + client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, force_virtual_addressing, False) + + return S3FileSystem(**client_kwargs) + + def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem: + from pyarrow.fs import S3FileSystem, resolve_s3_region + + # Resolve region from netloc(bucket), fallback to user-provided region + provided_region = get_first_property_value(self.properties, S3_REGION, AWS_REGION) + + try: + bucket_region = resolve_s3_region(netloc) Review Comment: ```suggestion bucket_region = resolve_s3_region(bucket=netloc) ``` make it explicit that `netloc` variable is the s3 `bucket` name ########## pyiceberg/io/pyarrow.py: ########## @@ -351,76 +344,146 @@ def parse_location(location: str) -> Tuple[str, str, str]: return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}" def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem: - if scheme in {"s3", "s3a", "s3n", "oss"}: - from pyarrow.fs import S3FileSystem + """Initialize FileSystem for different scheme.""" + if scheme in {"oss"}: + return self._initialize_oss_fs(scheme, netloc) - client_kwargs: Dict[str, Any] = { - "endpoint_override": self.properties.get(S3_ENDPOINT), - "access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), - "secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), - "session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN), - "region": get_first_property_value(self.properties, S3_REGION, AWS_REGION), - } + elif scheme in {"s3", "s3a", "s3n"}: + return self._initialize_s3_fs(scheme, netloc) - if proxy_uri := self.properties.get(S3_PROXY_URI): - client_kwargs["proxy_options"] = proxy_uri + elif scheme in ("hdfs", "viewfs"): + return self._initialize_hdfs_fs(scheme, netloc) - if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT): - client_kwargs["connect_timeout"] = float(connect_timeout) + elif scheme in {"gs", "gcs"}: + return self._initialize_gcs_fs(scheme, netloc) - if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN): - client_kwargs["role_arn"] = role_arn + elif scheme in {"file"}: + return self._initialize_local_fs(scheme, netloc) - if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME): - client_kwargs["session_name"] = session_name + else: + raise ValueError(f"Unrecognized filesystem type in URI: {scheme}") - if force_virtual_addressing := self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING): - client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, force_virtual_addressing, False) + def _initialize_oss_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: + from pyarrow.fs import S3FileSystem - return S3FileSystem(**client_kwargs) - elif scheme in ("hdfs", "viewfs"): - from pyarrow.fs import HadoopFileSystem - - hdfs_kwargs: Dict[str, Any] = {} - if netloc: - return HadoopFileSystem.from_uri(f"{scheme}://{netloc}") - if host := self.properties.get(HDFS_HOST): - hdfs_kwargs["host"] = host - if port := self.properties.get(HDFS_PORT): - # port should be an integer type - hdfs_kwargs["port"] = int(port) - if user := self.properties.get(HDFS_USER): - hdfs_kwargs["user"] = user - if kerb_ticket := self.properties.get(HDFS_KERB_TICKET): - hdfs_kwargs["kerb_ticket"] = kerb_ticket - - return HadoopFileSystem(**hdfs_kwargs) - elif scheme in {"gs", "gcs"}: - from pyarrow.fs import GcsFileSystem - - gcs_kwargs: Dict[str, Any] = {} - if access_token := self.properties.get(GCS_TOKEN): - gcs_kwargs["access_token"] = access_token - if expiration := self.properties.get(GCS_TOKEN_EXPIRES_AT_MS): - gcs_kwargs["credential_token_expiration"] = millis_to_datetime(int(expiration)) - if bucket_location := self.properties.get(GCS_DEFAULT_LOCATION): - gcs_kwargs["default_bucket_location"] = bucket_location - if endpoint := get_first_property_value(self.properties, GCS_SERVICE_HOST, GCS_ENDPOINT): - if self.properties.get(GCS_ENDPOINT): - deprecation_message( - deprecated_in="0.8.0", - removed_in="0.9.0", - help_message=f"The property {GCS_ENDPOINT} is deprecated, please use {GCS_SERVICE_HOST} instead", - ) - url_parts = urlparse(endpoint) - gcs_kwargs["scheme"] = url_parts.scheme - gcs_kwargs["endpoint_override"] = url_parts.netloc + client_kwargs: Dict[str, Any] = { + "endpoint_override": self.properties.get(S3_ENDPOINT), + "access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), + "secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), + "session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN), + "region": get_first_property_value(self.properties, S3_REGION, AWS_REGION), + } - return GcsFileSystem(**gcs_kwargs) - elif scheme == "file": - return PyArrowLocalFileSystem() - else: - raise ValueError(f"Unrecognized filesystem type in URI: {scheme}") + if proxy_uri := self.properties.get(S3_PROXY_URI): + client_kwargs["proxy_options"] = proxy_uri + + if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT): + client_kwargs["connect_timeout"] = float(connect_timeout) + + if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN): + client_kwargs["role_arn"] = role_arn + + if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME): + client_kwargs["session_name"] = session_name + + if force_virtual_addressing := self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING): + client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, force_virtual_addressing, False) + + return S3FileSystem(**client_kwargs) + + def _initialize_s3_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: + from pyarrow.fs import S3FileSystem, resolve_s3_region + + # Resolve region from netloc(bucket), fallback to user-provided region + provided_region = get_first_property_value(self.properties, S3_REGION, AWS_REGION) + + try: + bucket_region = resolve_s3_region(netloc) + except (OSError, TypeError): + bucket_region = None + logger.warning(f"Unable to resolve region for bucket {netloc}, using default region {provided_region}") + + bucket_region = bucket_region or provided_region + if bucket_region != provided_region: + logger.warning( + f"PyArrow FileIO overriding S3 bucket region for bucket {netloc}: " + f"provided region {provided_region}, actual region {bucket_region}" + ) + + client_kwargs: Dict[str, Any] = { + "endpoint_override": self.properties.get(S3_ENDPOINT), + "access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), + "secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), + "session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN), + "region": bucket_region, + } + + if proxy_uri := self.properties.get(S3_PROXY_URI): + client_kwargs["proxy_options"] = proxy_uri + + if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT): + client_kwargs["connect_timeout"] = float(connect_timeout) + + if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN): + client_kwargs["role_arn"] = role_arn + + if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME): + client_kwargs["session_name"] = session_name + + if force_virtual_addressing := self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING): + client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, force_virtual_addressing, False) + + return S3FileSystem(**client_kwargs) + + def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: + from pyarrow.fs import HadoopFileSystem + + hdfs_kwargs: Dict[str, Any] = {} + if netloc: + return HadoopFileSystem.from_uri(f"{scheme}://{netloc}") + if host := self.properties.get(HDFS_HOST): + hdfs_kwargs["host"] = host + if port := self.properties.get(HDFS_PORT): + # port should be an integer type + hdfs_kwargs["port"] = int(port) + if user := self.properties.get(HDFS_USER): + hdfs_kwargs["user"] = user + if kerb_ticket := self.properties.get(HDFS_KERB_TICKET): + hdfs_kwargs["kerb_ticket"] = kerb_ticket + + return HadoopFileSystem(**hdfs_kwargs) + + def _initialize_gcs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: + from pyarrow.fs import GcsFileSystem + + gcs_kwargs: Dict[str, Any] = {} + if access_token := self.properties.get(GCS_TOKEN): + gcs_kwargs["access_token"] = access_token + if expiration := self.properties.get(GCS_TOKEN_EXPIRES_AT_MS): + gcs_kwargs["credential_token_expiration"] = millis_to_datetime(int(expiration)) + if bucket_location := self.properties.get(GCS_DEFAULT_LOCATION): + gcs_kwargs["default_bucket_location"] = bucket_location + if endpoint := get_first_property_value(self.properties, GCS_SERVICE_HOST, GCS_ENDPOINT): + if self.properties.get(GCS_ENDPOINT): + deprecation_message( + deprecated_in="0.8.0", + removed_in="0.9.0", + help_message=f"The property {GCS_ENDPOINT} is deprecated, please use {GCS_SERVICE_HOST} instead", + ) Review Comment: we can remove it now since the next release using the `main` branch will be for `0.9.0`. But id prefer to remove it in a separate PR since there's also references to `GCS_ENDPOINT` in fsspec https://grep.app/search?q=GCS_ENDPOINT&filter[repo][0]=apache/iceberg-python -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org