BsoBird commented on code in PR #10623:
URL: https://github.com/apache/iceberg/pull/10623#discussion_r1669877984

##########
core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java:
##########
@@ -159,18 +169,124 @@ public void commit(TableMetadata base, TableMetadata 
metadata) {
     int nextVersion = (current.first() != null ? current.first() : 0) + 1;
     Path finalMetadataFile = metadataFilePath(nextVersion, codec);
     FileSystem fs = getFileSystem(tempMetadataFile, conf);
+    boolean versionCommitSuccess = false;
+    boolean useObjectStore =
+        metadata.propertyAsBoolean(
+            TableProperties.OBJECT_STORE_ENABLED, 
TableProperties.OBJECT_STORE_ENABLED_DEFAULT);
+    int previousVersionsMax =
+        metadata.propertyAsInt(
+            TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
+            TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT);
+    // todo:Currently, if the user is using an object store, we assume that he 
must be using the
+    //  global locking service. But we should support add some other 
conditions in future.
+    boolean supportGlobalLocking = useObjectStore;
+    try {
+      tryLock(tempMetadataFile, metadataRoot());
+      versionCommitSuccess =
+          commitNewVersion(
+              fs, tempMetadataFile, finalMetadataFile, nextVersion, 
supportGlobalLocking);
+      if (!versionCommitSuccess) {
+        throw new CommitFailedException(
+            "Can not commit newMetaData because version [%s] has already been 
committed. tempMetaData=[%s],finalMetaData=[%s].Are there other clients running 
in parallel with the current task?",
+            nextVersion, tempMetadataFile, finalMetadataFile);
+      }
+      validate(supportGlobalLocking, previousVersionsMax, nextVersion, fs, 
finalMetadataFile);
+      this.shouldRefresh = true;
+      LOG.info("Committed a new metadata file {}", finalMetadataFile);
+      // update the best-effort version pointer
+      writeVersionHint(fs, nextVersion);
+      deleteRemovedMetadataFiles(base, metadata);
+    } catch (CommitStateUnknownException e) {
+      this.shouldRefresh = true;
+      throw e;
+    } catch (Exception e) {
+      this.shouldRefresh = versionCommitSuccess;
+      if (!versionCommitSuccess) {
+        tryDelete(tempMetadataFile);
+        throw new CommitFailedException(e);
+      }
+    } finally {
+      unlock(tempMetadataFile, metadataRoot());
+    }
+  }
 
-    // this rename operation is the atomic commit operation
-    renameToFinal(fs, tempMetadataFile, finalMetadataFile, nextVersion);
+  private void tryDelete(Path path) {
+    try {
+      io().deleteFile(path.toString());
+    } catch (Exception ignored) {
+      // do nothing
+    }
+  }
 
-    LOG.info("Committed a new metadata file {}", finalMetadataFile);
+  @VisibleForTesting
+  void tryLock(Path src, Path dst) {
+    if (!lockManager.acquire(dst.toString(), src.toString())) {
+      throw new CommitFailedException(
+          "Failed to acquire lock on file: %s with owner: %s", dst, src);
+    }
+  }
 
-    // update the best-effort version pointer
-    writeVersionHint(nextVersion);
+  void unlock(Path src, Path dst) {
+    try {
+      if (!lockManager.release(dst.toString(), src.toString())) {
+        LOG.warn("Failed to release lock on file: {} with owner: {}", dst, 
src);
+      }
+    } catch (Exception ignored) {
+      // do nothing.
+    }
+  }
 
-    deleteRemovedMetadataFiles(base, metadata);
+  private void validate(
+      boolean supportGlobalLocking,
+      int previousVersionsMax,
+      int nextVersion,
+      FileSystem fs,
+      Path finalMetadataFile)
+      throws IOException {
+    if (!supportGlobalLocking) {
+      fastFailIfDirtyCommit(previousVersionsMax, nextVersion, fs, 
finalMetadataFile);
+      cleanAllTooOldDirtyCommit(fs, previousVersionsMax);
+    }
+  }
 
-    this.shouldRefresh = true;
+  @VisibleForTesting
+  void fastFailIfDirtyCommit(
+      int previousVersionsMax, int nextVersion, FileSystem fs, Path 
finalMetadataFile)
+      throws IOException {
+    int currentMaxVersion = findVersionWithOutVersionHint(fs);
+    if ((currentMaxVersion - nextVersion) > previousVersionsMax && 
fs.exists(finalMetadataFile)) {
+      tryDelete(finalMetadataFile);
+      throw new CommitStateUnknownException(
+          new RejectedExecutionException(

Review Comment:
   Under some boundary conditions, this check may be misinterpreted, and we may 
misidentify a normal commit as a dirty commit.However, we throw the 
commitStateUnknownExcetion so that it does not affect the normal commits of 
other clients.
   
   Situations where there is a miscarriage of justice: During the previous 
commits, the deleteRemovedMetadataFiles method was not executed due to some 
exceptions.
   
   I think even if that happens, it's at least better than a dirty commit.At 
least we didn't destroy any data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to