This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 1d0ff953561 [bugfix](hive)fix after insert overwrite hive table, data 
error #43049 (#43311)
1d0ff953561 is described below

commit 1d0ff953561d8237bd2081eef644b7ec61d3dd0e
Author: Mingyu Chen (Rayner) <morning...@163.com>
AuthorDate: Wed Nov 6 15:09:01 2024 +0800

    [bugfix](hive)fix after insert overwrite hive table, data error #43049 
(#43311)
    
    cherry pick from #43049
    
    Co-authored-by: wuwenchi <wuwenchi...@hotmail.com>
---
 .../doris/datasource/hive/HMSTransaction.java      | 30 ++++++++++++++++------
 .../doris/fs/remote/RemoteFSPhantomManager.java    |  9 +++++++
 .../org/apache/doris/fs/remote/S3FileSystem.java   | 15 +++++++++--
 .../apache/doris/fs/remote/dfs/DFSFileSystem.java  |  6 ++---
 .../org/apache/doris/planner/HiveTableSink.java    |  2 +-
 5 files changed, 48 insertions(+), 14 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
index 6183c277c1b..02c99a695c8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
@@ -101,7 +101,7 @@ public class HMSTransaction implements Transaction {
     private final Executor fileSystemExecutor;
     private HmsCommitter hmsCommitter;
     private List<THivePartitionUpdate> hivePartitionUpdates = 
Lists.newArrayList();
-    private String declaredIntentionsToWrite;
+    private Optional<String> stagingDirectory;
     private boolean isMockedPartitionUpdate = false;
 
     private static class UncompletedMpuPendingUpload {
@@ -184,10 +184,14 @@ public class HMSTransaction implements Transaction {
     }
 
     public void beginInsertTable(HiveInsertCommandContext ctx) {
-        declaredIntentionsToWrite = ctx.getWritePath();
         queryId = ctx.getQueryId();
         isOverwrite = ctx.isOverwrite();
         fileType = ctx.getFileType();
+        if (fileType == TFileType.FILE_S3) {
+            stagingDirectory = Optional.empty();
+        } else {
+            stagingDirectory = Optional.of(ctx.getWritePath());
+        }
     }
 
     public void finishInsertTable(SimpleTableInfo tableInfo) {
@@ -207,10 +211,12 @@ public class HMSTransaction implements Transaction {
                             }
                         });
                     } else {
-                        fs.makeDir(declaredIntentionsToWrite);
-                        setLocation(new THiveLocationParams() {{
-                                setWritePath(declaredIntentionsToWrite);
-                            }
+                        stagingDirectory.ifPresent((v) -> {
+                            fs.makeDir(v);
+                            setLocation(new THiveLocationParams() {{
+                                    setWritePath(v);
+                                }
+                            });
                         });
                     }
                 }
@@ -643,15 +649,23 @@ public class HMSTransaction implements Transaction {
         if (!deleteResult.getNotDeletedEligibleItems().isEmpty()) {
             LOG.warn("Failed to delete directory {}. Some eligible items can't 
be deleted: {}.",
                     directory.toString(), 
deleteResult.getNotDeletedEligibleItems());
+            throw new RuntimeException(
+                "Failed to delete directory for files: " + 
deleteResult.getNotDeletedEligibleItems());
         } else if (deleteEmptyDir && !deleteResult.dirNotExists()) {
             LOG.warn("Failed to delete directory {} due to dir isn't empty", 
directory.toString());
+            throw new RuntimeException("Failed to delete directory for empty 
dir: " + directory.toString());
         }
     }
 
     private DeleteRecursivelyResult recursiveDeleteFiles(Path directory, 
boolean deleteEmptyDir, boolean reverse) {
         try {
-            if (!fs.directoryExists(directory.toString()).ok()) {
+            Status status = fs.directoryExists(directory.toString());
+            if (status.getErrCode().equals(Status.ErrCode.NOT_FOUND)) {
                 return new DeleteRecursivelyResult(true, ImmutableList.of());
+            } else if (!status.ok()) {
+                ImmutableList.Builder<String> notDeletedEligibleItems = 
ImmutableList.builder();
+                notDeletedEligibleItems.add(directory.toString() + "/*");
+                return new DeleteRecursivelyResult(false, 
notDeletedEligibleItems.build());
             }
         } catch (Exception e) {
             ImmutableList.Builder<String> notDeletedEligibleItems = 
ImmutableList.builder();
@@ -1447,7 +1461,7 @@ public class HMSTransaction implements Transaction {
         }
 
         private void pruneAndDeleteStagingDirectories() {
-            recursiveDeleteItems(new Path(declaredIntentionsToWrite), true, 
false);
+            stagingDirectory.ifPresent((v) -> recursiveDeleteItems(new 
Path(v), true, false));
         }
 
         private void abortMultiUploads() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java
index 282361c4cb6..c0e48a13466 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java
@@ -19,6 +19,7 @@ package org.apache.doris.fs.remote;
 
 import org.apache.doris.common.CustomThreadFactory;
 
+import com.google.common.collect.Sets;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -27,6 +28,7 @@ import java.io.IOException;
 import java.lang.ref.PhantomReference;
 import java.lang.ref.Reference;
 import java.lang.ref.ReferenceQueue;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -63,6 +65,8 @@ public class RemoteFSPhantomManager {
     private static final ConcurrentHashMap<PhantomReference<RemoteFileSystem>, 
FileSystem> referenceMap
             = new ConcurrentHashMap<>();
 
+    private static final Set<FileSystem> fsSet = Sets.newConcurrentHashSet();
+
     // Flag indicating whether the cleanup thread has been started
     private static final AtomicBoolean isStarted = new AtomicBoolean(false);
 
@@ -77,9 +81,13 @@ public class RemoteFSPhantomManager {
             start();
             isStarted.set(true);
         }
+        if (fsSet.contains(remoteFileSystem.dfsFileSystem)) {
+            throw new RuntimeException("FileSystem already exists: " + 
remoteFileSystem.dfsFileSystem.getUri());
+        }
         RemoteFileSystemPhantomReference phantomReference = new 
RemoteFileSystemPhantomReference(remoteFileSystem,
                 referenceQueue);
         referenceMap.put(phantomReference, remoteFileSystem.dfsFileSystem);
+        fsSet.add(remoteFileSystem.dfsFileSystem);
     }
 
     /**
@@ -102,6 +110,7 @@ public class RemoteFSPhantomManager {
                             if (fs != null) {
                                 try {
                                     fs.close();
+                                    fsSet.remove(fs);
                                     LOG.info("Closed file system: {}", 
fs.getUri());
                                 } catch (IOException e) {
                                     LOG.warn("Failed to close file system", e);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
index 87ba086baec..f8805bd0d4f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
@@ -20,6 +20,8 @@ package org.apache.doris.fs.remote;
 import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.backup.Status;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.security.authentication.AuthenticationConfig;
+import org.apache.doris.common.security.authentication.HadoopAuthenticator;
 import org.apache.doris.datasource.property.PropertyConverter;
 import org.apache.doris.fs.obj.S3ObjStorage;
 import org.apache.doris.fs.remote.dfs.DFSFileSystem;
@@ -34,6 +36,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
@@ -74,12 +77,20 @@ public class S3FileSystem extends ObjFileSystem {
                     
PropertyConverter.convertToHadoopFSProperties(properties).entrySet().stream()
                             .filter(entry -> entry.getKey() != null && 
entry.getValue() != null)
                             .forEach(entry -> conf.set(entry.getKey(), 
entry.getValue()));
+                    AuthenticationConfig authConfig = 
AuthenticationConfig.getKerberosConfig(conf);
+                    HadoopAuthenticator authenticator = 
HadoopAuthenticator.getHadoopAuthenticator(authConfig);
                     try {
-                        dfsFileSystem = FileSystem.get(new 
Path(remotePath).toUri(), conf);
+                        dfsFileSystem = authenticator.doAs(() -> {
+                            try {
+                                return FileSystem.get(new 
Path(remotePath).toUri(), conf);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+                        RemoteFSPhantomManager.registerPhantomReference(this);
                     } catch (Exception e) {
                         throw new UserException("Failed to get S3 FileSystem 
for " + e.getMessage(), e);
                     }
-                    RemoteFSPhantomManager.registerPhantomReference(this);
                 }
             }
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
index 7034641a9fc..2146472aec7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
@@ -99,11 +99,11 @@ public class DFSFileSystem extends RemoteFileSystem {
                                 throw new RuntimeException(e);
                             }
                         });
+                        operations = new HDFSFileOperations(dfsFileSystem);
+                        RemoteFSPhantomManager.registerPhantomReference(this);
                     } catch (Exception e) {
-                        throw new UserException(e);
+                        throw new UserException("Failed to get dfs FileSystem 
for " + e.getMessage(), e);
                     }
-                    operations = new HDFSFileOperations(dfsFileSystem);
-                    RemoteFSPhantomManager.registerPhantomReference(this);
                 }
             }
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
index d1f8ab411ea..168f92c113c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
@@ -133,7 +133,7 @@ public class HiveTableSink extends 
BaseExternalTableDataSink {
             if (insertCtx.isPresent()) {
                 HiveInsertCommandContext context = (HiveInsertCommandContext) 
insertCtx.get();
                 tSink.setOverwrite(context.isOverwrite());
-                context.setWritePath(storageLocation);
+                context.setWritePath(location);
                 context.setFileType(fileType);
             }
         } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to