Fokko commented on code in PR #1453: URL: https://github.com/apache/iceberg-python/pull/1453#discussion_r1903352592
########## pyiceberg/io/pyarrow.py: ########## @@ -190,13 +190,6 @@ T = TypeVar("T") -class PyArrowLocalFileSystem(pyarrow.fs.LocalFileSystem): Review Comment: People could rely on this for other things, I don't think we can just remove this without deprecation. ########## pyiceberg/io/pyarrow.py: ########## @@ -351,76 +344,146 @@ def parse_location(location: str) -> Tuple[str, str, str]: return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}" def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem: - if scheme in {"s3", "s3a", "s3n", "oss"}: - from pyarrow.fs import S3FileSystem + """Initialize FileSystem for different scheme.""" + if scheme in {"oss"}: + return self._initialize_oss_fs(scheme, netloc) - client_kwargs: Dict[str, Any] = { - "endpoint_override": self.properties.get(S3_ENDPOINT), - "access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), - "secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), - "session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN), - "region": get_first_property_value(self.properties, S3_REGION, AWS_REGION), - } + elif scheme in {"s3", "s3a", "s3n"}: + return self._initialize_s3_fs(scheme, netloc) - if proxy_uri := self.properties.get(S3_PROXY_URI): - client_kwargs["proxy_options"] = proxy_uri + elif scheme in ("hdfs", "viewfs"): + return self._initialize_hdfs_fs(scheme, netloc) - if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT): - client_kwargs["connect_timeout"] = float(connect_timeout) + elif scheme in {"gs", "gcs"}: + return self._initialize_gcs_fs(scheme, netloc) - if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN): - client_kwargs["role_arn"] = role_arn + elif scheme in {"file"}: + return self._initialize_local_fs(scheme, netloc) - if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME): - client_kwargs["session_name"] = session_name + else: + raise ValueError(f"Unrecognized filesystem type in URI: {scheme}") - if force_virtual_addressing := self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING): - client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, force_virtual_addressing, False) + def _initialize_oss_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: + from pyarrow.fs import S3FileSystem - return S3FileSystem(**client_kwargs) - elif scheme in ("hdfs", "viewfs"): - from pyarrow.fs import HadoopFileSystem - - hdfs_kwargs: Dict[str, Any] = {} - if netloc: - return HadoopFileSystem.from_uri(f"{scheme}://{netloc}") - if host := self.properties.get(HDFS_HOST): - hdfs_kwargs["host"] = host - if port := self.properties.get(HDFS_PORT): - # port should be an integer type - hdfs_kwargs["port"] = int(port) - if user := self.properties.get(HDFS_USER): - hdfs_kwargs["user"] = user - if kerb_ticket := self.properties.get(HDFS_KERB_TICKET): - hdfs_kwargs["kerb_ticket"] = kerb_ticket - - return HadoopFileSystem(**hdfs_kwargs) - elif scheme in {"gs", "gcs"}: - from pyarrow.fs import GcsFileSystem - - gcs_kwargs: Dict[str, Any] = {} - if access_token := self.properties.get(GCS_TOKEN): - gcs_kwargs["access_token"] = access_token - if expiration := self.properties.get(GCS_TOKEN_EXPIRES_AT_MS): - gcs_kwargs["credential_token_expiration"] = millis_to_datetime(int(expiration)) - if bucket_location := self.properties.get(GCS_DEFAULT_LOCATION): - gcs_kwargs["default_bucket_location"] = bucket_location - if endpoint := get_first_property_value(self.properties, GCS_SERVICE_HOST, GCS_ENDPOINT): - if self.properties.get(GCS_ENDPOINT): - deprecation_message( - deprecated_in="0.8.0", - removed_in="0.9.0", - help_message=f"The property {GCS_ENDPOINT} is deprecated, please use {GCS_SERVICE_HOST} instead", - ) - url_parts = urlparse(endpoint) - gcs_kwargs["scheme"] = url_parts.scheme - gcs_kwargs["endpoint_override"] = url_parts.netloc + client_kwargs: Dict[str, Any] = { + "endpoint_override": self.properties.get(S3_ENDPOINT), + "access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), + "secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), + "session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN), + "region": get_first_property_value(self.properties, S3_REGION, AWS_REGION), + } - return GcsFileSystem(**gcs_kwargs) - elif scheme == "file": - return PyArrowLocalFileSystem() - else: - raise ValueError(f"Unrecognized filesystem type in URI: {scheme}") + if proxy_uri := self.properties.get(S3_PROXY_URI): + client_kwargs["proxy_options"] = proxy_uri + + if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT): + client_kwargs["connect_timeout"] = float(connect_timeout) + + if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN): + client_kwargs["role_arn"] = role_arn + + if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME): + client_kwargs["session_name"] = session_name + + if force_virtual_addressing := self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING): + client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, force_virtual_addressing, False) + + return S3FileSystem(**client_kwargs) + + def _initialize_s3_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: + from pyarrow.fs import S3FileSystem, resolve_s3_region + + # Resolve region from netloc(bucket), fallback to user-provided region + provided_region = get_first_property_value(self.properties, S3_REGION, AWS_REGION) + + try: + bucket_region = resolve_s3_region(netloc) + except (OSError, TypeError): + bucket_region = None + logger.warning(f"Unable to resolve region for bucket {netloc}, using default region {provided_region}") + + bucket_region = bucket_region or provided_region + if bucket_region != provided_region: + logger.warning( + f"PyArrow FileIO overriding S3 bucket region for bucket {netloc}: " + f"provided region {provided_region}, actual region {bucket_region}" + ) Review Comment: I like this one, thanks! ########## pyiceberg/io/pyarrow.py: ########## @@ -351,76 +344,146 @@ def parse_location(location: str) -> Tuple[str, str, str]: return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}" def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem: - if scheme in {"s3", "s3a", "s3n", "oss"}: - from pyarrow.fs import S3FileSystem + """Initialize FileSystem for different scheme.""" + if scheme in {"oss"}: + return self._initialize_oss_fs(scheme, netloc) Review Comment: Do we know if Alibaba doesn't support this? ########## pyiceberg/io/pyarrow.py: ########## @@ -190,13 +190,6 @@ T = TypeVar("T") -class PyArrowLocalFileSystem(pyarrow.fs.LocalFileSystem): Review Comment: Let's revert this change, I don't like the inline declaration of the class. ########## pyiceberg/io/pyarrow.py: ########## @@ -351,76 +344,146 @@ def parse_location(location: str) -> Tuple[str, str, str]: return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}" def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem: - if scheme in {"s3", "s3a", "s3n", "oss"}: - from pyarrow.fs import S3FileSystem + """Initialize FileSystem for different scheme.""" + if scheme in {"oss"}: + return self._initialize_oss_fs(scheme, netloc) - client_kwargs: Dict[str, Any] = { - "endpoint_override": self.properties.get(S3_ENDPOINT), - "access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), - "secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), - "session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN), - "region": get_first_property_value(self.properties, S3_REGION, AWS_REGION), - } + elif scheme in {"s3", "s3a", "s3n"}: + return self._initialize_s3_fs(scheme, netloc) - if proxy_uri := self.properties.get(S3_PROXY_URI): - client_kwargs["proxy_options"] = proxy_uri + elif scheme in ("hdfs", "viewfs"): Review Comment: Let's be consistent here: ```suggestion elif scheme in {"hdfs", "viewfs"}: ``` ########## pyiceberg/io/pyarrow.py: ########## @@ -351,76 +344,146 @@ def parse_location(location: str) -> Tuple[str, str, str]: return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}" def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem: - if scheme in {"s3", "s3a", "s3n", "oss"}: - from pyarrow.fs import S3FileSystem + """Initialize FileSystem for different scheme.""" + if scheme in {"oss"}: + return self._initialize_oss_fs(scheme, netloc) - client_kwargs: Dict[str, Any] = { - "endpoint_override": self.properties.get(S3_ENDPOINT), - "access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), - "secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), - "session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN), - "region": get_first_property_value(self.properties, S3_REGION, AWS_REGION), - } + elif scheme in {"s3", "s3a", "s3n"}: + return self._initialize_s3_fs(scheme, netloc) Review Comment: Many of the methods don't require all the parameters, for example, `_initialize_s3_fs` does not use the `scheme`. Should we remove those? ########## pyiceberg/io/pyarrow.py: ########## @@ -362,6 +362,12 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste "region": get_first_property_value(self.properties, S3_REGION, AWS_REGION), } + # Override the default s3.region if netloc(bucket) resolves to a different region + try: + client_kwargs["region"] = resolve_s3_region(netloc) Review Comment: Thanks for elaborating on this, I want to make sure that the user is aware of it, and I think we do that right with the warning. For some additional context, for Java we don't have this issue because when you try to query the wrong region, the AWS SDK returns an HTTP 301 to the correct region. This introduces another 200 call but that's okay. The PyArrow implementation (that I believe uses the AWS C++ SDK underneath), just throws an error that it got a 301. We saw that in the past for example here: https://github.com/apache/iceberg-python/issues/515#issuecomment-2029207168. ########## pyiceberg/io/pyarrow.py: ########## @@ -351,76 +344,146 @@ def parse_location(location: str) -> Tuple[str, str, str]: return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}" def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem: - if scheme in {"s3", "s3a", "s3n", "oss"}: - from pyarrow.fs import S3FileSystem + """Initialize FileSystem for different scheme.""" + if scheme in {"oss"}: + return self._initialize_oss_fs(scheme, netloc) - client_kwargs: Dict[str, Any] = { - "endpoint_override": self.properties.get(S3_ENDPOINT), - "access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), - "secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), - "session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN), - "region": get_first_property_value(self.properties, S3_REGION, AWS_REGION), - } + elif scheme in {"s3", "s3a", "s3n"}: + return self._initialize_s3_fs(scheme, netloc) - if proxy_uri := self.properties.get(S3_PROXY_URI): - client_kwargs["proxy_options"] = proxy_uri + elif scheme in ("hdfs", "viewfs"): + return self._initialize_hdfs_fs(scheme, netloc) - if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT): - client_kwargs["connect_timeout"] = float(connect_timeout) + elif scheme in {"gs", "gcs"}: + return self._initialize_gcs_fs(scheme, netloc) - if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN): - client_kwargs["role_arn"] = role_arn + elif scheme in {"file"}: + return self._initialize_local_fs(scheme, netloc) - if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME): - client_kwargs["session_name"] = session_name + else: + raise ValueError(f"Unrecognized filesystem type in URI: {scheme}") - if force_virtual_addressing := self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING): - client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, force_virtual_addressing, False) + def _initialize_oss_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: + from pyarrow.fs import S3FileSystem - return S3FileSystem(**client_kwargs) - elif scheme in ("hdfs", "viewfs"): - from pyarrow.fs import HadoopFileSystem - - hdfs_kwargs: Dict[str, Any] = {} - if netloc: - return HadoopFileSystem.from_uri(f"{scheme}://{netloc}") - if host := self.properties.get(HDFS_HOST): - hdfs_kwargs["host"] = host - if port := self.properties.get(HDFS_PORT): - # port should be an integer type - hdfs_kwargs["port"] = int(port) - if user := self.properties.get(HDFS_USER): - hdfs_kwargs["user"] = user - if kerb_ticket := self.properties.get(HDFS_KERB_TICKET): - hdfs_kwargs["kerb_ticket"] = kerb_ticket - - return HadoopFileSystem(**hdfs_kwargs) - elif scheme in {"gs", "gcs"}: - from pyarrow.fs import GcsFileSystem - - gcs_kwargs: Dict[str, Any] = {} - if access_token := self.properties.get(GCS_TOKEN): - gcs_kwargs["access_token"] = access_token - if expiration := self.properties.get(GCS_TOKEN_EXPIRES_AT_MS): - gcs_kwargs["credential_token_expiration"] = millis_to_datetime(int(expiration)) - if bucket_location := self.properties.get(GCS_DEFAULT_LOCATION): - gcs_kwargs["default_bucket_location"] = bucket_location - if endpoint := get_first_property_value(self.properties, GCS_SERVICE_HOST, GCS_ENDPOINT): - if self.properties.get(GCS_ENDPOINT): - deprecation_message( - deprecated_in="0.8.0", - removed_in="0.9.0", - help_message=f"The property {GCS_ENDPOINT} is deprecated, please use {GCS_SERVICE_HOST} instead", - ) - url_parts = urlparse(endpoint) - gcs_kwargs["scheme"] = url_parts.scheme - gcs_kwargs["endpoint_override"] = url_parts.netloc + client_kwargs: Dict[str, Any] = { + "endpoint_override": self.properties.get(S3_ENDPOINT), + "access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), + "secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), + "session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN), + "region": get_first_property_value(self.properties, S3_REGION, AWS_REGION), + } - return GcsFileSystem(**gcs_kwargs) - elif scheme == "file": - return PyArrowLocalFileSystem() - else: - raise ValueError(f"Unrecognized filesystem type in URI: {scheme}") + if proxy_uri := self.properties.get(S3_PROXY_URI): + client_kwargs["proxy_options"] = proxy_uri + + if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT): + client_kwargs["connect_timeout"] = float(connect_timeout) + + if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN): + client_kwargs["role_arn"] = role_arn + + if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME): + client_kwargs["session_name"] = session_name + + if force_virtual_addressing := self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING): + client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, force_virtual_addressing, False) + + return S3FileSystem(**client_kwargs) + + def _initialize_s3_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: + from pyarrow.fs import S3FileSystem, resolve_s3_region + + # Resolve region from netloc(bucket), fallback to user-provided region + provided_region = get_first_property_value(self.properties, S3_REGION, AWS_REGION) + + try: + bucket_region = resolve_s3_region(netloc) + except (OSError, TypeError): + bucket_region = None + logger.warning(f"Unable to resolve region for bucket {netloc}, using default region {provided_region}") + + bucket_region = bucket_region or provided_region + if bucket_region != provided_region: + logger.warning( + f"PyArrow FileIO overriding S3 bucket region for bucket {netloc}: " + f"provided region {provided_region}, actual region {bucket_region}" + ) + + client_kwargs: Dict[str, Any] = { + "endpoint_override": self.properties.get(S3_ENDPOINT), + "access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), + "secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), + "session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN), + "region": bucket_region, + } + + if proxy_uri := self.properties.get(S3_PROXY_URI): + client_kwargs["proxy_options"] = proxy_uri + + if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT): + client_kwargs["connect_timeout"] = float(connect_timeout) + + if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN): + client_kwargs["role_arn"] = role_arn + + if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME): + client_kwargs["session_name"] = session_name + + if force_virtual_addressing := self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING): + client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, force_virtual_addressing, False) + + return S3FileSystem(**client_kwargs) + + def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: + from pyarrow.fs import HadoopFileSystem + + hdfs_kwargs: Dict[str, Any] = {} + if netloc: + return HadoopFileSystem.from_uri(f"{scheme}://{netloc}") + if host := self.properties.get(HDFS_HOST): + hdfs_kwargs["host"] = host + if port := self.properties.get(HDFS_PORT): + # port should be an integer type + hdfs_kwargs["port"] = int(port) + if user := self.properties.get(HDFS_USER): + hdfs_kwargs["user"] = user + if kerb_ticket := self.properties.get(HDFS_KERB_TICKET): + hdfs_kwargs["kerb_ticket"] = kerb_ticket + + return HadoopFileSystem(**hdfs_kwargs) + + def _initialize_gcs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: + from pyarrow.fs import GcsFileSystem + + gcs_kwargs: Dict[str, Any] = {} + if access_token := self.properties.get(GCS_TOKEN): + gcs_kwargs["access_token"] = access_token + if expiration := self.properties.get(GCS_TOKEN_EXPIRES_AT_MS): + gcs_kwargs["credential_token_expiration"] = millis_to_datetime(int(expiration)) + if bucket_location := self.properties.get(GCS_DEFAULT_LOCATION): + gcs_kwargs["default_bucket_location"] = bucket_location + if endpoint := get_first_property_value(self.properties, GCS_SERVICE_HOST, GCS_ENDPOINT): + if self.properties.get(GCS_ENDPOINT): + deprecation_message( + deprecated_in="0.8.0", + removed_in="0.9.0", + help_message=f"The property {GCS_ENDPOINT} is deprecated, please use {GCS_SERVICE_HOST} instead", + ) Review Comment: Should we remove this one, while at it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org