BsoBird commented on code in PR #10623: URL: https://github.com/apache/iceberg/pull/10623#discussion_r1673311069
########## core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java: ########## @@ -291,137 +407,171 @@ Path versionHintFile() { return metadataPath(Util.VERSION_HINT_FILENAME); } - private void writeVersionHint(int versionToWrite) { + @VisibleForTesting + void writeVersionHint(FileSystem fs, Integer versionToWrite) throws Exception { Path versionHintFile = versionHintFile(); - FileSystem fs = getFileSystem(versionHintFile, conf); - + Path tempVersionHintFile = metadataPath(UUID.randomUUID() + "-version-hint.temp"); try { - Path tempVersionHintFile = metadataPath(UUID.randomUUID().toString() + "-version-hint.temp"); writeVersionToPath(fs, tempVersionHintFile, versionToWrite); - fs.delete(versionHintFile, false /* recursive delete */); fs.rename(tempVersionHintFile, versionHintFile); } catch (IOException e) { - LOG.warn("Failed to update version hint", e); + // Cleaning up temporary files. + if (fs.exists(tempVersionHintFile)) { + io().deleteFile(tempVersionHintFile.toString()); + } + throw e; } } - private void writeVersionToPath(FileSystem fs, Path path, int versionToWrite) throws IOException { + @VisibleForTesting + boolean nextVersionIsLatest(int nextVersion, int currentMaxVersion) { + return nextVersion == (currentMaxVersion + 1); + } + + private void writeVersionToPath(FileSystem fs, Path path, int versionToWrite) { try (FSDataOutputStream out = fs.create(path, false /* overwrite */)) { out.write(String.valueOf(versionToWrite).getBytes(StandardCharsets.UTF_8)); + } catch (IOException e) { + throw new RuntimeIOException(e); } } @VisibleForTesting - int findVersion() { - Path versionHintFile = versionHintFile(); - FileSystem fs = getFileSystem(versionHintFile, conf); - + int findVersionByUsingVersionHint(FileSystem fs, Path versionHintFile) throws IOException { try (InputStreamReader fsr = new InputStreamReader(fs.open(versionHintFile), StandardCharsets.UTF_8); BufferedReader in = new BufferedReader(fsr)) { return Integer.parseInt(in.readLine().replace("\n", "")); + } + } - } catch (Exception e) { - try { - if (fs.exists(metadataRoot())) { - LOG.warn("Error reading version hint file {}", versionHintFile, e); - } else { - LOG.debug("Metadata for table not found in directory {}", metadataRoot(), e); - return 0; - } - - // List the metadata directory to find the version files, and try to recover the max - // available version - FileStatus[] files = - fs.listStatus( - metadataRoot(), name -> VERSION_PATTERN.matcher(name.getName()).matches()); - int maxVersion = 0; - - for (FileStatus file : files) { - int currentVersion = version(file.getPath().getName()); - if (currentVersion > maxVersion && getMetadataFile(currentVersion) != null) { - maxVersion = currentVersion; - } - } - - return maxVersion; - } catch (IOException io) { - LOG.warn("Error trying to recover version-hint.txt data for {}", versionHintFile, e); + @VisibleForTesting + int findVersionWithOutVersionHint(FileSystem fs) { + try { + if (!fs.exists(metadataRoot())) { + // Either the table has just been created, or it has been corrupted, but either way, we have + // to start at version 0. + LOG.warn("Metadata for table not found in directory [{}]", metadataRoot()); return 0; } + // List the metadata directory to find the version files, and try to recover the max + // available version + FileStatus[] files = + fs.listStatus(metadataRoot(), name -> VERSION_PATTERN.matcher(name.getName()).matches()); + int maxVersion = 0; + for (FileStatus file : files) { + int currentVersion = version(file.getPath().getName()); + if (currentVersion > maxVersion && getMetadataFile(currentVersion) != null) { + maxVersion = currentVersion; + } + } + return maxVersion; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @VisibleForTesting + int findVersion() { + Path versionHintFile = versionHintFile(); + FileSystem fs = getFileSystem(versionHintFile, conf); + try { + return fs.exists(versionHintFile) + ? findVersionByUsingVersionHint(fs, versionHintFile) + : findVersionWithOutVersionHint(fs); + } catch (Exception e) { + // try one last time + return findVersionWithOutVersionHint(fs); } } /** - * Renames the source file to destination, using the provided file system. If the rename failed, - * an attempt will be made to delete the source file. + * Renames the source file to destination, using the provided file system. * * @param fs the filesystem used for the rename * @param src the source file * @param dst the destination file + * @return If it returns true, then the commit was successful. */ - private void renameToFinal(FileSystem fs, Path src, Path dst, int nextVersion) { - try { - if (!lockManager.acquire(dst.toString(), src.toString())) { - throw new CommitFailedException( - "Failed to acquire lock on file: %s with owner: %s", dst, src); + @VisibleForTesting + boolean commitNewVersion( + FileSystem fs, Path src, Path dst, Integer nextVersion, boolean supportGlobalLocking) + throws IOException { + if (fs.exists(dst)) { + throw new CommitFailedException("Version %d already exists: %s", nextVersion, dst); + } + int maxVersion = supportGlobalLocking ? findVersion() : findVersionWithOutVersionHint(fs); + if (!nextVersionIsLatest(nextVersion, maxVersion)) { + if (!supportGlobalLocking) { + io().deleteFile(versionHintFile().toString()); } + throw new CommitFailedException( + "Cannot commit version [%d] because it is smaller or much larger than the current latest version [%d].Are there other clients running in parallel with the current task?", + nextVersion, maxVersion); + } + io().deleteFile(versionHintFile().toString()); Review Comment: We are always actively removing the versionHintFile. Doing so may slightly interfere with other clients' reads (in some extreme cases, querying for the version may become slightly slower).However, by doing so, we can avoid as much as possible the problems associated with inaccurate information in versionHintFile, and greatly improve the reliability of the findVersion method.In previous discussions, Russell and I have agreed that this is an acceptable trade-off. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org