This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new e3170d63199 [bugfix](hive)fix after insert overwrite hive table, data error for 2.1 (#43049) (#43127) e3170d63199 is described below commit e3170d63199d1b6a015ce058f2bd050140dd733d Author: wuwenchi <wuwenchi...@hotmail.com> AuthorDate: Sun Nov 3 23:38:47 2024 +0800 [bugfix](hive)fix after insert overwrite hive table, data error for 2.1 (#43049) (#43127) bp: #43049 --- .../doris/datasource/hive/HMSTransaction.java | 30 ++++++++++++++++------ .../doris/fs/remote/RemoteFSPhantomManager.java | 9 +++++++ .../org/apache/doris/fs/remote/S3FileSystem.java | 15 +++++++++-- .../apache/doris/fs/remote/dfs/DFSFileSystem.java | 6 ++--- .../org/apache/doris/planner/HiveTableSink.java | 2 +- 5 files changed, 48 insertions(+), 14 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java index 3044b214ab6..8041904723a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java @@ -100,7 +100,7 @@ public class HMSTransaction implements Transaction { private final Executor fileSystemExecutor; private HmsCommitter hmsCommitter; private List<THivePartitionUpdate> hivePartitionUpdates = Lists.newArrayList(); - private String declaredIntentionsToWrite; + private Optional<String> stagingDirectory; private boolean isMockedPartitionUpdate = false; private static class UncompletedMpuPendingUpload { @@ -177,10 +177,14 @@ public class HMSTransaction implements Transaction { } public void beginInsertTable(HiveInsertCommandContext ctx) { - declaredIntentionsToWrite = ctx.getWritePath(); queryId = ctx.getQueryId(); isOverwrite = ctx.isOverwrite(); fileType = ctx.getFileType(); + if (fileType == TFileType.FILE_S3) { + stagingDirectory = Optional.empty(); + } else { + stagingDirectory = Optional.of(ctx.getWritePath()); + } } public void finishInsertTable(SimpleTableInfo tableInfo) { @@ -200,10 +204,12 @@ public class HMSTransaction implements Transaction { } }); } else { - fs.makeDir(declaredIntentionsToWrite); - setLocation(new THiveLocationParams() {{ - setWritePath(declaredIntentionsToWrite); - } + stagingDirectory.ifPresent((v) -> { + fs.makeDir(v); + setLocation(new THiveLocationParams() {{ + setWritePath(v); + } + }); }); } } @@ -636,15 +642,23 @@ public class HMSTransaction implements Transaction { if (!deleteResult.getNotDeletedEligibleItems().isEmpty()) { LOG.warn("Failed to delete directory {}. Some eligible items can't be deleted: {}.", directory.toString(), deleteResult.getNotDeletedEligibleItems()); + throw new RuntimeException( + "Failed to delete directory for files: " + deleteResult.getNotDeletedEligibleItems()); } else if (deleteEmptyDir && !deleteResult.dirNotExists()) { LOG.warn("Failed to delete directory {} due to dir isn't empty", directory.toString()); + throw new RuntimeException("Failed to delete directory for empty dir: " + directory.toString()); } } private DeleteRecursivelyResult recursiveDeleteFiles(Path directory, boolean deleteEmptyDir, boolean reverse) { try { - if (!fs.directoryExists(directory.toString()).ok()) { + Status status = fs.directoryExists(directory.toString()); + if (status.getErrCode().equals(Status.ErrCode.NOT_FOUND)) { return new DeleteRecursivelyResult(true, ImmutableList.of()); + } else if (!status.ok()) { + ImmutableList.Builder<String> notDeletedEligibleItems = ImmutableList.builder(); + notDeletedEligibleItems.add(directory.toString() + "/*"); + return new DeleteRecursivelyResult(false, notDeletedEligibleItems.build()); } } catch (Exception e) { ImmutableList.Builder<String> notDeletedEligibleItems = ImmutableList.builder(); @@ -1440,7 +1454,7 @@ public class HMSTransaction implements Transaction { } private void pruneAndDeleteStagingDirectories() { - recursiveDeleteItems(new Path(declaredIntentionsToWrite), true, false); + stagingDirectory.ifPresent((v) -> recursiveDeleteItems(new Path(v), true, false)); } private void abortMultiUploads() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java index 282361c4cb6..c0e48a13466 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java @@ -19,6 +19,7 @@ package org.apache.doris.fs.remote; import org.apache.doris.common.CustomThreadFactory; +import com.google.common.collect.Sets; import org.apache.hadoop.fs.FileSystem; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -27,6 +28,7 @@ import java.io.IOException; import java.lang.ref.PhantomReference; import java.lang.ref.Reference; import java.lang.ref.ReferenceQueue; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -63,6 +65,8 @@ public class RemoteFSPhantomManager { private static final ConcurrentHashMap<PhantomReference<RemoteFileSystem>, FileSystem> referenceMap = new ConcurrentHashMap<>(); + private static final Set<FileSystem> fsSet = Sets.newConcurrentHashSet(); + // Flag indicating whether the cleanup thread has been started private static final AtomicBoolean isStarted = new AtomicBoolean(false); @@ -77,9 +81,13 @@ public class RemoteFSPhantomManager { start(); isStarted.set(true); } + if (fsSet.contains(remoteFileSystem.dfsFileSystem)) { + throw new RuntimeException("FileSystem already exists: " + remoteFileSystem.dfsFileSystem.getUri()); + } RemoteFileSystemPhantomReference phantomReference = new RemoteFileSystemPhantomReference(remoteFileSystem, referenceQueue); referenceMap.put(phantomReference, remoteFileSystem.dfsFileSystem); + fsSet.add(remoteFileSystem.dfsFileSystem); } /** @@ -102,6 +110,7 @@ public class RemoteFSPhantomManager { if (fs != null) { try { fs.close(); + fsSet.remove(fs); LOG.info("Closed file system: {}", fs.getUri()); } catch (IOException e) { LOG.warn("Failed to close file system", e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java index 87ba086baec..f8805bd0d4f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java @@ -20,6 +20,8 @@ package org.apache.doris.fs.remote; import org.apache.doris.analysis.StorageBackend; import org.apache.doris.backup.Status; import org.apache.doris.common.UserException; +import org.apache.doris.common.security.authentication.AuthenticationConfig; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.fs.obj.S3ObjStorage; import org.apache.doris.fs.remote.dfs.DFSFileSystem; @@ -34,6 +36,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.FileNotFoundException; +import java.io.IOException; import java.util.List; import java.util.Map; @@ -74,12 +77,20 @@ public class S3FileSystem extends ObjFileSystem { PropertyConverter.convertToHadoopFSProperties(properties).entrySet().stream() .filter(entry -> entry.getKey() != null && entry.getValue() != null) .forEach(entry -> conf.set(entry.getKey(), entry.getValue())); + AuthenticationConfig authConfig = AuthenticationConfig.getKerberosConfig(conf); + HadoopAuthenticator authenticator = HadoopAuthenticator.getHadoopAuthenticator(authConfig); try { - dfsFileSystem = FileSystem.get(new Path(remotePath).toUri(), conf); + dfsFileSystem = authenticator.doAs(() -> { + try { + return FileSystem.get(new Path(remotePath).toUri(), conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + RemoteFSPhantomManager.registerPhantomReference(this); } catch (Exception e) { throw new UserException("Failed to get S3 FileSystem for " + e.getMessage(), e); } - RemoteFSPhantomManager.registerPhantomReference(this); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java index 7034641a9fc..2146472aec7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java @@ -99,11 +99,11 @@ public class DFSFileSystem extends RemoteFileSystem { throw new RuntimeException(e); } }); + operations = new HDFSFileOperations(dfsFileSystem); + RemoteFSPhantomManager.registerPhantomReference(this); } catch (Exception e) { - throw new UserException(e); + throw new UserException("Failed to get dfs FileSystem for " + e.getMessage(), e); } - operations = new HDFSFileOperations(dfsFileSystem); - RemoteFSPhantomManager.registerPhantomReference(this); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java index 97671581b29..93774d49e37 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java @@ -133,7 +133,7 @@ public class HiveTableSink extends BaseExternalTableDataSink { if (insertCtx.isPresent()) { HiveInsertCommandContext context = (HiveInsertCommandContext) insertCtx.get(); tSink.setOverwrite(context.isOverwrite()); - context.setWritePath(storageLocation); + context.setWritePath(location); context.setFileType(fileType); } } else { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org