Fokko commented on code in PR #330: URL: https://github.com/apache/iceberg-python/pull/330#discussion_r1473948988
########## pyiceberg/table/__init__.py: ########## @@ -791,6 +792,76 @@ class CommitTableResponse(IcebergBaseModel): metadata_location: str = Field(alias="metadata-location") +class TableCommitRetry: + """Decorator for building the table commit retry controller.""" + + num_retries = "commit.retry.num-retries" + num_retries_default: int = 4 + min_wait_ms = "commit.retry.min-wait-ms" + min_wait_ms_default: int = 100 + max_wait_ms = "commit.retry.max-wait-ms" + max_wait_ms_default: int = 60000 # 1 min + total_timeout_ms = "commit.retry.total-timeout-ms" + total_timeout_ms_default: int = 1800000 # 30 mins + + def __init__(self, func: Callable[..., Any], properties_attribute: str = "properties") -> None: + self.properties_attr: str = properties_attribute + self.func: Callable[..., Any] = func + self.loaded_properties: Properties = {} + + def __get__(self, instance: Any, owner: Any) -> Callable[..., Any]: + """Return the __call__ method with the instance caller.""" + return partial(self.__call__, instance) + + def __call__(self, instance: Any, *args: Any, **kwargs: Any) -> Any: + """Run function with the retrying controller on the caller instance.""" + self.loaded_properties = getattr(instance, self.properties_attr) + try: + for attempt in self.build_retry_controller(): + with attempt: + result = self.func(instance, *args, **kwargs) + except RetryError as err: + raise Exception from err.reraise() + else: + return result + + @property + def table_properties(self) -> Properties: + """Get the table properties from the instance that is calling this decorator.""" + return self.loaded_properties Review Comment: nit: More on the style, I would probably leave out this property, since it has the same signature as `loaded_properties`. Therefore the table properties map has two names (`loaded_properties` and `table_properties`). ########## pyiceberg/table/__init__.py: ########## @@ -994,6 +1065,7 @@ def refs(self) -> Dict[str, SnapshotRef]: """Return the snapshot references in the table.""" return self.metadata.refs + @table_commit_retry("properties") Review Comment: It depends on what we are trying to do here. There are two types of retries that we want to support: - Intermittent network issues, catalog temporarily not available, etc. - Retrying of commits because the table changed. The first one should probably be done on the catalog level because we also need do differentiate between the different errors, and [see if they are retriable](https://github.com/apache/iceberg/blob/9c8e9ba67f41797ded61c11fa90af2d8df0da1cd/core/src/main/java/org/apache/iceberg/rest/ExponentialHttpRequestRetryStrategy.java#L91-L116). For the second case, the one that you are solving here, we need some more logic around loading the latest version of the table. The retry is being done on the `CommitFailedException` which is thrown at a HTTP409 of the REST catalog. A 409 means a conflict and that the table has changed. At this point, we only support append and overwrite operations, which don't need any conflict detection. I believe that's Einstein's definition of insanity :) Do we want to refresh the table metadata, and reapply the changes? I would expect like in https://github.com/apache/iceberg-python/blob/b1e33d48bc2c06a812a59c14e37ef73da9968f50/tests/catalog/test_sql.py#L803-L823 to fail after this change. ########## pyiceberg/table/__init__.py: ########## @@ -791,6 +792,76 @@ class CommitTableResponse(IcebergBaseModel): metadata_location: str = Field(alias="metadata-location") +class TableCommitRetry: + """Decorator for building the table commit retry controller.""" + + num_retries = "commit.retry.num-retries" + num_retries_default: int = 4 + min_wait_ms = "commit.retry.min-wait-ms" + min_wait_ms_default: int = 100 + max_wait_ms = "commit.retry.max-wait-ms" + max_wait_ms_default: int = 60000 # 1 min + total_timeout_ms = "commit.retry.total-timeout-ms" + total_timeout_ms_default: int = 1800000 # 30 mins + + def __init__(self, func: Callable[..., Any], properties_attribute: str = "properties") -> None: + self.properties_attr: str = properties_attribute + self.func: Callable[..., Any] = func + self.loaded_properties: Properties = {} + + def __get__(self, instance: Any, owner: Any) -> Callable[..., Any]: + """Return the __call__ method with the instance caller.""" + return partial(self.__call__, instance) + + def __call__(self, instance: Any, *args: Any, **kwargs: Any) -> Any: + """Run function with the retrying controller on the caller instance.""" + self.loaded_properties = getattr(instance, self.properties_attr) + try: + for attempt in self.build_retry_controller(): + with attempt: + result = self.func(instance, *args, **kwargs) + except RetryError as err: + raise Exception from err.reraise() + else: + return result + + @property + def table_properties(self) -> Properties: + """Get the table properties from the instance that is calling this decorator.""" + return self.loaded_properties + + def build_retry_controller(self) -> Retrying: + """Build the retry controller.""" + return Retrying( + stop=( + stop_after_attempt(self.get_config(self.num_retries, self.num_retries_default)) + | stop_after_delay( + datetime.timedelta(milliseconds=self.get_config(self.total_timeout_ms, self.total_timeout_ms_default)) + ) + ), + wait=wait_exponential(min=self.get_config(self.min_wait_ms, self.min_wait_ms_default) / 1000.0), + retry=retry_if_exception_type(CommitFailedException), + ) + + def get_config(self, config: str, default: int) -> int: + """Get config out of the properties.""" + return self.to_int(self.table_properties.get(config, ""), default) Review Comment: If the key doesn't exists we try to convert the empty string. How about throwing in some walrus `:=`: ```suggestion return self.to_int(value) if (value := self.table_properties.get(config)) else default ``` ########## pyiceberg/table/__init__.py: ########## @@ -791,6 +792,76 @@ class CommitTableResponse(IcebergBaseModel): metadata_location: str = Field(alias="metadata-location") +class TableCommitRetry: + """Decorator for building the table commit retry controller.""" + + num_retries = "commit.retry.num-retries" + num_retries_default: int = 4 + min_wait_ms = "commit.retry.min-wait-ms" + min_wait_ms_default: int = 100 + max_wait_ms = "commit.retry.max-wait-ms" + max_wait_ms_default: int = 60000 # 1 min + total_timeout_ms = "commit.retry.total-timeout-ms" + total_timeout_ms_default: int = 1800000 # 30 mins + + def __init__(self, func: Callable[..., Any], properties_attribute: str = "properties") -> None: + self.properties_attr: str = properties_attribute + self.func: Callable[..., Any] = func + self.loaded_properties: Properties = {} + + def __get__(self, instance: Any, owner: Any) -> Callable[..., Any]: + """Return the __call__ method with the instance caller.""" + return partial(self.__call__, instance) + + def __call__(self, instance: Any, *args: Any, **kwargs: Any) -> Any: + """Run function with the retrying controller on the caller instance.""" + self.loaded_properties = getattr(instance, self.properties_attr) + try: + for attempt in self.build_retry_controller(): + with attempt: + result = self.func(instance, *args, **kwargs) + except RetryError as err: + raise Exception from err.reraise() + else: + return result + + @property + def table_properties(self) -> Properties: + """Get the table properties from the instance that is calling this decorator.""" + return self.loaded_properties + + def build_retry_controller(self) -> Retrying: + """Build the retry controller.""" + return Retrying( + stop=( + stop_after_attempt(self.get_config(self.num_retries, self.num_retries_default)) + | stop_after_delay( + datetime.timedelta(milliseconds=self.get_config(self.total_timeout_ms, self.total_timeout_ms_default)) + ) + ), + wait=wait_exponential(min=self.get_config(self.min_wait_ms, self.min_wait_ms_default) / 1000.0), + retry=retry_if_exception_type(CommitFailedException), + ) + + def get_config(self, config: str, default: int) -> int: + """Get config out of the properties.""" + return self.to_int(self.table_properties.get(config, ""), default) + + @staticmethod + def to_int(v: str, default: int) -> int: + """Convert str value to int, otherwise return a default.""" + try: + return int(v) + except (ValueError, TypeError): + pass Review Comment: No we swallow the exception, what do you think of: ```python import warnings warnings.warn("Expected an integer for table property {config}, got: {v}") ``` ########## pyiceberg/table/__init__.py: ########## @@ -791,6 +792,76 @@ class CommitTableResponse(IcebergBaseModel): metadata_location: str = Field(alias="metadata-location") +class TableCommitRetry: + """Decorator for building the table commit retry controller.""" + + num_retries = "commit.retry.num-retries" + num_retries_default: int = 4 + min_wait_ms = "commit.retry.min-wait-ms" + min_wait_ms_default: int = 100 + max_wait_ms = "commit.retry.max-wait-ms" + max_wait_ms_default: int = 60000 # 1 min + total_timeout_ms = "commit.retry.total-timeout-ms" + total_timeout_ms_default: int = 1800000 # 30 mins + + def __init__(self, func: Callable[..., Any], properties_attribute: str = "properties") -> None: + self.properties_attr: str = properties_attribute + self.func: Callable[..., Any] = func + self.loaded_properties: Properties = {} + + def __get__(self, instance: Any, owner: Any) -> Callable[..., Any]: + """Return the __call__ method with the instance caller.""" + return partial(self.__call__, instance) + + def __call__(self, instance: Any, *args: Any, **kwargs: Any) -> Any: + """Run function with the retrying controller on the caller instance.""" + self.loaded_properties = getattr(instance, self.properties_attr) + try: + for attempt in self.build_retry_controller(): + with attempt: + result = self.func(instance, *args, **kwargs) + except RetryError as err: + raise Exception from err.reraise() + else: + return result + + @property + def table_properties(self) -> Properties: + """Get the table properties from the instance that is calling this decorator.""" + return self.loaded_properties + + def build_retry_controller(self) -> Retrying: + """Build the retry controller.""" + return Retrying( + stop=( + stop_after_attempt(self.get_config(self.num_retries, self.num_retries_default)) + | stop_after_delay( + datetime.timedelta(milliseconds=self.get_config(self.total_timeout_ms, self.total_timeout_ms_default)) + ) + ), + wait=wait_exponential(min=self.get_config(self.min_wait_ms, self.min_wait_ms_default) / 1000.0), + retry=retry_if_exception_type(CommitFailedException), + ) + + def get_config(self, config: str, default: int) -> int: + """Get config out of the properties.""" + return self.to_int(self.table_properties.get(config, ""), default) + + @staticmethod + def to_int(v: str, default: int) -> int: + """Convert str value to int, otherwise return a default.""" + try: + return int(v) + except (ValueError, TypeError): + pass + return default + + +def table_commit_retry(properties_attribute: str) -> Callable[..., TableCommitRetry]: Review Comment: ```suggestion def table_commit_retry(properties_attribute: str = "properties") -> Callable[..., TableCommitRetry]: ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org