Fokko commented on code in PR #330:
URL: https://github.com/apache/iceberg-python/pull/330#discussion_r1473948988


##########
pyiceberg/table/__init__.py:
##########
@@ -791,6 +792,76 @@ class CommitTableResponse(IcebergBaseModel):
     metadata_location: str = Field(alias="metadata-location")
 
 
+class TableCommitRetry:
+    """Decorator for building the table commit retry controller."""
+
+    num_retries = "commit.retry.num-retries"
+    num_retries_default: int = 4
+    min_wait_ms = "commit.retry.min-wait-ms"
+    min_wait_ms_default: int = 100
+    max_wait_ms = "commit.retry.max-wait-ms"
+    max_wait_ms_default: int = 60000  # 1 min
+    total_timeout_ms = "commit.retry.total-timeout-ms"
+    total_timeout_ms_default: int = 1800000  # 30 mins
+
+    def __init__(self, func: Callable[..., Any], properties_attribute: str = 
"properties") -> None:
+        self.properties_attr: str = properties_attribute
+        self.func: Callable[..., Any] = func
+        self.loaded_properties: Properties = {}
+
+    def __get__(self, instance: Any, owner: Any) -> Callable[..., Any]:
+        """Return the __call__ method with the instance caller."""
+        return partial(self.__call__, instance)
+
+    def __call__(self, instance: Any, *args: Any, **kwargs: Any) -> Any:
+        """Run function with the retrying controller on the caller instance."""
+        self.loaded_properties = getattr(instance, self.properties_attr)
+        try:
+            for attempt in self.build_retry_controller():
+                with attempt:
+                    result = self.func(instance, *args, **kwargs)
+        except RetryError as err:
+            raise Exception from err.reraise()
+        else:
+            return result
+
+    @property
+    def table_properties(self) -> Properties:
+        """Get the table properties from the instance that is calling this 
decorator."""
+        return self.loaded_properties

Review Comment:
   nit: More on the style, I would probably leave out this property, since it 
has the same signature as `loaded_properties`. Therefore the table properties 
map has two names (`loaded_properties` and `table_properties`).



##########
pyiceberg/table/__init__.py:
##########
@@ -994,6 +1065,7 @@ def refs(self) -> Dict[str, SnapshotRef]:
         """Return the snapshot references in the table."""
         return self.metadata.refs
 
+    @table_commit_retry("properties")

Review Comment:
   It depends on what we are trying to do here. There are two types of retries 
that we want to support:
   
   - Intermittent network issues, catalog temporarily not available, etc.
   - Retrying of commits because the table changed.
   
   The first one should probably be done on the catalog level because we also 
need do differentiate between the different errors, and [see if they are 
retriable](https://github.com/apache/iceberg/blob/9c8e9ba67f41797ded61c11fa90af2d8df0da1cd/core/src/main/java/org/apache/iceberg/rest/ExponentialHttpRequestRetryStrategy.java#L91-L116).
   
   For the second case, the one that you are solving here, we need some more 
logic around loading the latest version of the table. The retry is being done 
on the `CommitFailedException` which is thrown at a HTTP409 of the REST 
catalog. A 409 means a conflict and that the table has changed. At this point, 
we only support append and overwrite operations, which don't need any conflict 
detection. I believe that's Einstein's definition of insanity :)
   
   Do we want to refresh the table metadata, and reapply the changes? I would 
expect like in 
https://github.com/apache/iceberg-python/blob/b1e33d48bc2c06a812a59c14e37ef73da9968f50/tests/catalog/test_sql.py#L803-L823
 to fail after this change.



##########
pyiceberg/table/__init__.py:
##########
@@ -791,6 +792,76 @@ class CommitTableResponse(IcebergBaseModel):
     metadata_location: str = Field(alias="metadata-location")
 
 
+class TableCommitRetry:
+    """Decorator for building the table commit retry controller."""
+
+    num_retries = "commit.retry.num-retries"
+    num_retries_default: int = 4
+    min_wait_ms = "commit.retry.min-wait-ms"
+    min_wait_ms_default: int = 100
+    max_wait_ms = "commit.retry.max-wait-ms"
+    max_wait_ms_default: int = 60000  # 1 min
+    total_timeout_ms = "commit.retry.total-timeout-ms"
+    total_timeout_ms_default: int = 1800000  # 30 mins
+
+    def __init__(self, func: Callable[..., Any], properties_attribute: str = 
"properties") -> None:
+        self.properties_attr: str = properties_attribute
+        self.func: Callable[..., Any] = func
+        self.loaded_properties: Properties = {}
+
+    def __get__(self, instance: Any, owner: Any) -> Callable[..., Any]:
+        """Return the __call__ method with the instance caller."""
+        return partial(self.__call__, instance)
+
+    def __call__(self, instance: Any, *args: Any, **kwargs: Any) -> Any:
+        """Run function with the retrying controller on the caller instance."""
+        self.loaded_properties = getattr(instance, self.properties_attr)
+        try:
+            for attempt in self.build_retry_controller():
+                with attempt:
+                    result = self.func(instance, *args, **kwargs)
+        except RetryError as err:
+            raise Exception from err.reraise()
+        else:
+            return result
+
+    @property
+    def table_properties(self) -> Properties:
+        """Get the table properties from the instance that is calling this 
decorator."""
+        return self.loaded_properties
+
+    def build_retry_controller(self) -> Retrying:
+        """Build the retry controller."""
+        return Retrying(
+            stop=(
+                stop_after_attempt(self.get_config(self.num_retries, 
self.num_retries_default))
+                | stop_after_delay(
+                    
datetime.timedelta(milliseconds=self.get_config(self.total_timeout_ms, 
self.total_timeout_ms_default))
+                )
+            ),
+            wait=wait_exponential(min=self.get_config(self.min_wait_ms, 
self.min_wait_ms_default) / 1000.0),
+            retry=retry_if_exception_type(CommitFailedException),
+        )
+
+    def get_config(self, config: str, default: int) -> int:
+        """Get config out of the properties."""
+        return self.to_int(self.table_properties.get(config, ""), default)

Review Comment:
   If the key doesn't exists we try to convert the empty string. How about 
throwing in some walrus `:=`:
   
   ```suggestion
           return self.to_int(value) if (value := 
self.table_properties.get(config)) else default
   ```
   



##########
pyiceberg/table/__init__.py:
##########
@@ -791,6 +792,76 @@ class CommitTableResponse(IcebergBaseModel):
     metadata_location: str = Field(alias="metadata-location")
 
 
+class TableCommitRetry:
+    """Decorator for building the table commit retry controller."""
+
+    num_retries = "commit.retry.num-retries"
+    num_retries_default: int = 4
+    min_wait_ms = "commit.retry.min-wait-ms"
+    min_wait_ms_default: int = 100
+    max_wait_ms = "commit.retry.max-wait-ms"
+    max_wait_ms_default: int = 60000  # 1 min
+    total_timeout_ms = "commit.retry.total-timeout-ms"
+    total_timeout_ms_default: int = 1800000  # 30 mins
+
+    def __init__(self, func: Callable[..., Any], properties_attribute: str = 
"properties") -> None:
+        self.properties_attr: str = properties_attribute
+        self.func: Callable[..., Any] = func
+        self.loaded_properties: Properties = {}
+
+    def __get__(self, instance: Any, owner: Any) -> Callable[..., Any]:
+        """Return the __call__ method with the instance caller."""
+        return partial(self.__call__, instance)
+
+    def __call__(self, instance: Any, *args: Any, **kwargs: Any) -> Any:
+        """Run function with the retrying controller on the caller instance."""
+        self.loaded_properties = getattr(instance, self.properties_attr)
+        try:
+            for attempt in self.build_retry_controller():
+                with attempt:
+                    result = self.func(instance, *args, **kwargs)
+        except RetryError as err:
+            raise Exception from err.reraise()
+        else:
+            return result
+
+    @property
+    def table_properties(self) -> Properties:
+        """Get the table properties from the instance that is calling this 
decorator."""
+        return self.loaded_properties
+
+    def build_retry_controller(self) -> Retrying:
+        """Build the retry controller."""
+        return Retrying(
+            stop=(
+                stop_after_attempt(self.get_config(self.num_retries, 
self.num_retries_default))
+                | stop_after_delay(
+                    
datetime.timedelta(milliseconds=self.get_config(self.total_timeout_ms, 
self.total_timeout_ms_default))
+                )
+            ),
+            wait=wait_exponential(min=self.get_config(self.min_wait_ms, 
self.min_wait_ms_default) / 1000.0),
+            retry=retry_if_exception_type(CommitFailedException),
+        )
+
+    def get_config(self, config: str, default: int) -> int:
+        """Get config out of the properties."""
+        return self.to_int(self.table_properties.get(config, ""), default)
+
+    @staticmethod
+    def to_int(v: str, default: int) -> int:
+        """Convert str value to int, otherwise return a default."""
+        try:
+            return int(v)
+        except (ValueError, TypeError):
+            pass

Review Comment:
   No we swallow the exception, what do you think of:
   
   ```python
           import warnings
           warnings.warn("Expected an integer for table property {config}, got: 
{v}")
   ```



##########
pyiceberg/table/__init__.py:
##########
@@ -791,6 +792,76 @@ class CommitTableResponse(IcebergBaseModel):
     metadata_location: str = Field(alias="metadata-location")
 
 
+class TableCommitRetry:
+    """Decorator for building the table commit retry controller."""
+
+    num_retries = "commit.retry.num-retries"
+    num_retries_default: int = 4
+    min_wait_ms = "commit.retry.min-wait-ms"
+    min_wait_ms_default: int = 100
+    max_wait_ms = "commit.retry.max-wait-ms"
+    max_wait_ms_default: int = 60000  # 1 min
+    total_timeout_ms = "commit.retry.total-timeout-ms"
+    total_timeout_ms_default: int = 1800000  # 30 mins
+
+    def __init__(self, func: Callable[..., Any], properties_attribute: str = 
"properties") -> None:
+        self.properties_attr: str = properties_attribute
+        self.func: Callable[..., Any] = func
+        self.loaded_properties: Properties = {}
+
+    def __get__(self, instance: Any, owner: Any) -> Callable[..., Any]:
+        """Return the __call__ method with the instance caller."""
+        return partial(self.__call__, instance)
+
+    def __call__(self, instance: Any, *args: Any, **kwargs: Any) -> Any:
+        """Run function with the retrying controller on the caller instance."""
+        self.loaded_properties = getattr(instance, self.properties_attr)
+        try:
+            for attempt in self.build_retry_controller():
+                with attempt:
+                    result = self.func(instance, *args, **kwargs)
+        except RetryError as err:
+            raise Exception from err.reraise()
+        else:
+            return result
+
+    @property
+    def table_properties(self) -> Properties:
+        """Get the table properties from the instance that is calling this 
decorator."""
+        return self.loaded_properties
+
+    def build_retry_controller(self) -> Retrying:
+        """Build the retry controller."""
+        return Retrying(
+            stop=(
+                stop_after_attempt(self.get_config(self.num_retries, 
self.num_retries_default))
+                | stop_after_delay(
+                    
datetime.timedelta(milliseconds=self.get_config(self.total_timeout_ms, 
self.total_timeout_ms_default))
+                )
+            ),
+            wait=wait_exponential(min=self.get_config(self.min_wait_ms, 
self.min_wait_ms_default) / 1000.0),
+            retry=retry_if_exception_type(CommitFailedException),
+        )
+
+    def get_config(self, config: str, default: int) -> int:
+        """Get config out of the properties."""
+        return self.to_int(self.table_properties.get(config, ""), default)
+
+    @staticmethod
+    def to_int(v: str, default: int) -> int:
+        """Convert str value to int, otherwise return a default."""
+        try:
+            return int(v)
+        except (ValueError, TypeError):
+            pass
+        return default
+
+
+def table_commit_retry(properties_attribute: str) -> Callable[..., 
TableCommitRetry]:

Review Comment:
   ```suggestion
   def table_commit_retry(properties_attribute: str = "properties") -> 
Callable[..., TableCommitRetry]:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to