BsoBird commented on code in PR #10623: URL: https://github.com/apache/iceberg/pull/10623#discussion_r1669877984
########## core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java: ########## @@ -159,18 +169,124 @@ public void commit(TableMetadata base, TableMetadata metadata) { int nextVersion = (current.first() != null ? current.first() : 0) + 1; Path finalMetadataFile = metadataFilePath(nextVersion, codec); FileSystem fs = getFileSystem(tempMetadataFile, conf); + boolean versionCommitSuccess = false; + boolean useObjectStore = + metadata.propertyAsBoolean( + TableProperties.OBJECT_STORE_ENABLED, TableProperties.OBJECT_STORE_ENABLED_DEFAULT); + int previousVersionsMax = + metadata.propertyAsInt( + TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, + TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT); + // todo:Currently, if the user is using an object store, we assume that he must be using the + // global locking service. But we should support add some other conditions in future. + boolean supportGlobalLocking = useObjectStore; + try { + tryLock(tempMetadataFile, metadataRoot()); + versionCommitSuccess = + commitNewVersion( + fs, tempMetadataFile, finalMetadataFile, nextVersion, supportGlobalLocking); + if (!versionCommitSuccess) { + throw new CommitFailedException( + "Can not commit newMetaData because version [%s] has already been committed. tempMetaData=[%s],finalMetaData=[%s].Are there other clients running in parallel with the current task?", + nextVersion, tempMetadataFile, finalMetadataFile); + } + validate(supportGlobalLocking, previousVersionsMax, nextVersion, fs, finalMetadataFile); + this.shouldRefresh = true; + LOG.info("Committed a new metadata file {}", finalMetadataFile); + // update the best-effort version pointer + writeVersionHint(fs, nextVersion); + deleteRemovedMetadataFiles(base, metadata); + } catch (CommitStateUnknownException e) { + this.shouldRefresh = true; + throw e; + } catch (Exception e) { + this.shouldRefresh = versionCommitSuccess; + if (!versionCommitSuccess) { + tryDelete(tempMetadataFile); + throw new CommitFailedException(e); + } + } finally { + unlock(tempMetadataFile, metadataRoot()); + } + } - // this rename operation is the atomic commit operation - renameToFinal(fs, tempMetadataFile, finalMetadataFile, nextVersion); + private void tryDelete(Path path) { + try { + io().deleteFile(path.toString()); + } catch (Exception ignored) { + // do nothing + } + } - LOG.info("Committed a new metadata file {}", finalMetadataFile); + @VisibleForTesting + void tryLock(Path src, Path dst) { + if (!lockManager.acquire(dst.toString(), src.toString())) { + throw new CommitFailedException( + "Failed to acquire lock on file: %s with owner: %s", dst, src); + } + } - // update the best-effort version pointer - writeVersionHint(nextVersion); + void unlock(Path src, Path dst) { + try { + if (!lockManager.release(dst.toString(), src.toString())) { + LOG.warn("Failed to release lock on file: {} with owner: {}", dst, src); + } + } catch (Exception ignored) { + // do nothing. + } + } - deleteRemovedMetadataFiles(base, metadata); + private void validate( + boolean supportGlobalLocking, + int previousVersionsMax, + int nextVersion, + FileSystem fs, + Path finalMetadataFile) + throws IOException { + if (!supportGlobalLocking) { + fastFailIfDirtyCommit(previousVersionsMax, nextVersion, fs, finalMetadataFile); + cleanAllTooOldDirtyCommit(fs, previousVersionsMax); + } + } - this.shouldRefresh = true; + @VisibleForTesting + void fastFailIfDirtyCommit( + int previousVersionsMax, int nextVersion, FileSystem fs, Path finalMetadataFile) + throws IOException { + int currentMaxVersion = findVersionWithOutVersionHint(fs); + if ((currentMaxVersion - nextVersion) > previousVersionsMax && fs.exists(finalMetadataFile)) { + tryDelete(finalMetadataFile); + throw new CommitStateUnknownException( + new RejectedExecutionException( Review Comment: Under some boundary conditions, this check may be misinterpreted, and we may misidentify a normal commit as a dirty commit.However, we throw the commitStateUnknownExcetion so that it does not affect the normal commits of other clients. Situations where there is a miscarriage of justice: During the previous commits, the deleteRemovedMetadataFiles method was not executed due to some exceptions. I think even if that happens, it's at least better than a dirty commit.At least we didn't destroy any data. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org