qzyu999 commented on code in PR #3320:
URL: https://github.com/apache/iceberg-python/pull/3320#discussion_r3269503332


##########
pyiceberg/table/update/snapshot.py:
##########
@@ -351,11 +359,39 @@ def new_manifest_output(self) -> OutputFile:
         location_provider = self._transaction._table.location_provider()
         file_name = 
_new_manifest_file_name(num=next(self._manifest_num_counter), 
commit_uuid=self.commit_uuid)
         file_path = location_provider.new_metadata_location(file_name)
+        self._written_manifests.append(file_path)
         return self._io.new_output(file_path)
 
     def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted: 
bool = True) -> list[ManifestEntry]:
         return manifest.fetch_manifest_entry(io=self._io, 
discard_deleted=discard_deleted)
 
+    def commit(self) -> None:
+        self._transaction._register_snapshot_producer(self)
+        self._transaction._apply(*self._commit())
+
+    def _cleanup_uncommitted(self) -> None:
+        """Delete manifest files from failed retry attempts."""
+        for path in self._uncommitted_manifests:
+            try:
+                self._io.delete(path)
+            except Exception:
+                logger.warning("Failed to delete uncommitted manifest: %s", 
path, exc_info=True)
+        self._uncommitted_manifests.clear()

Review Comment:
   suggestion: We could also add a second similar function as follows:
   ```python
       def _clean_all_uncommitted(self) -> None:
           """Clean up all manifests written during this producer's lifecycle 
on abort."""
           for path in itertools.chain(self._uncommitted_manifests, 
self._written_manifests):
               try:
                   self._io.delete(path)
               except Exception:
                   logger.warning("Failed to delete uncommitted manifest: %s", 
path, exc_info=True)
           self._uncommitted_manifests.clear()
           self._written_manifests.clear()
   ```
   then in `Transaciton.commit_transaction()`, we can add a try/except to the 
for-loop as follows:
   ```python
           try:
               for attempt in range(num_retries + 1):
                   try:
                       self._table._do_commit(...)
                       self._cleanup_uncommitted_manifests()
                       break
                   except CommitFailedException:
                       ... # retry logic
           except Exception:
               # Catch ValidationException or retry exhaustion
               for producer in self._snapshot_producers:
                   producer._clean_all_uncommitted()
               raise
   ```
   this would then allow the PyIceberg implementation to mirror the 
`cleanAll()` method in Java. In the current implementation, the for-loop for 
retrying will only clear out the `_uncommitted_manifests` from the previous 
failed retries, but we can extend this with `_clean_all_uncommitted` which will 
 clear out that and `_written_manifests` from the current attempt in the case 
of a permanent abort. This would fix the gap for orphaned manifests from 
`ValidationException` (or other permanent failures) that are not cleaned up. I 
also think it's worth mentioning that this fix could be cleanly added to this 
PR without waiting for a full Delete orphaned files implementation in 
PyIceberg. WDYT about adding this into the current PR?



##########
pyiceberg/table/__init__.py:
##########
@@ -939,17 +975,73 @@ def commit_transaction(self) -> Table:
             The table with the updates applied.
         """
         if len(self._updates) > 0:
-            self._requirements += 
(AssertTableUUID(uuid=self.table_metadata.table_uuid),)
-            self._table._do_commit(  # pylint: disable=W0212
-                updates=self._updates,
-                requirements=self._requirements,
+            from pyiceberg.utils.properties import property_as_int
+
+            properties = self._table.metadata.properties
+            num_retries_val = property_as_int(
+                properties, TableProperties.COMMIT_NUM_RETRIES, 
TableProperties.COMMIT_NUM_RETRIES_DEFAULT
+            )
+            num_retries = num_retries_val if num_retries_val is not None else 
TableProperties.COMMIT_NUM_RETRIES_DEFAULT
+            min_wait_val = property_as_int(
+                properties, TableProperties.COMMIT_MIN_RETRY_WAIT_MS, 
TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT
+            )
+            min_wait_ms = min_wait_val if min_wait_val is not None else 
TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT
+            max_wait_val = property_as_int(
+                properties, TableProperties.COMMIT_MAX_RETRY_WAIT_MS, 
TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT
             )
+            max_wait_ms = max_wait_val if max_wait_val is not None else 
TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT
+            total_timeout_val = property_as_int(
+                properties, TableProperties.COMMIT_TOTAL_RETRY_TIME_MS, 
TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT
+            )
+            total_timeout_ms = (
+                total_timeout_val if total_timeout_val is not None else 
TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT
+            )
+            start_time = time.monotonic()
+
+            for attempt in range(num_retries + 1):
+                try:
+                    self._requirements += 
(AssertTableUUID(uuid=self.table_metadata.table_uuid),)

Review Comment:
   suggestion: Here `AssertTableUUID` is appended to `self._requirements` 
within each retry loop, but below in `_rebuild_snapshot_updates` it's removed 
again with:
   
   > `self._requirements = tuple(r for r in self._requirements if not 
isinstance(r, (AssertRefSnapshotId, AssertTableUUID)))`
   
   This can be simplified by moving `self._requirements += 
(AssertTableUUID(uuid=self.table_metadata.table_uuid),)` outside the for-loop 
and updating the line in `_rebuild_snapshot_updates` to simply:
   
   > `self._requirements = tuple(r for r in self._requirements if not 
isinstance(r, AssertRefSnapshotId))`
   
   The reason being is that `AssertTableUUID` would remain constant the whole 
time, so we're simply adding and removing it within each retry.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to