morningman commented on code in PR #44001:
URL: https://github.com/apache/doris/pull/44001#discussion_r1888191170


##########
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java:
##########
@@ -742,121 +738,37 @@ public LoadingCache<PartitionCacheKey, HivePartition> 
getPartitionCache() {
         return partitionCache;
     }
 
-    public List<FileCacheValue> getFilesByTransaction(List<HivePartition> 
partitions, ValidWriteIdList validWriteIds,
-            boolean isFullAcid, boolean skipCheckingAcidVersionFile, long 
tableId, String bindBrokerName) {
+    public List<FileCacheValue> getFilesByTransaction(List<HivePartition> 
partitions, Map<String, String> txnValidIds,
+            boolean isFullAcid, String bindBrokerName) {
         List<FileCacheValue> fileCacheValues = Lists.newArrayList();
         String remoteUser = jobConf.get(AuthenticationConfig.HADOOP_USER_NAME);
+
         try {
+            if (partitions.isEmpty()) {
+                return fileCacheValues;
+            }
+
             for (HivePartition partition : partitions) {
-                FileCacheValue fileCacheValue = new FileCacheValue();
-                AcidUtils.Directory directory;
+                //Get filesystem multiple times, reason: 
https://github.com/apache/doris/pull/23409.
+                RemoteFileSystem fileSystem = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
+                        new FileSystemCache.FileSystemCacheKey(
+                                
LocationPath.getFSIdentity(partition.getPath(), bindBrokerName),
+                                catalog.getCatalogProperty().getProperties(), 
bindBrokerName, jobConf));
+
                 if (!Strings.isNullOrEmpty(remoteUser)) {
                     UserGroupInformation ugi = 
UserGroupInformation.createRemoteUser(remoteUser);
-                    directory = 
ugi.doAs((PrivilegedExceptionAction<AcidUtils.Directory>) () -> 
AcidUtils.getAcidState(
-                            new Path(partition.getPath()), jobConf, 
validWriteIds, false, true));
+                    fileCacheValues.add(
+                            
ugi.doAs((PrivilegedExceptionAction<FileCacheValue>) () -> 
AcidUtil.getAcidState(
+                            fileSystem, partition, txnValidIds, 
catalog.getProperties()))
+                    );
                 } else {
-                    directory = AcidUtils.getAcidState(new 
Path(partition.getPath()), jobConf, validWriteIds, false,
-                            true);
-                }
-                if (directory == null) {
-                    return Collections.emptyList();
-                }
-                if (!directory.getOriginalFiles().isEmpty()) {
-                    throw new Exception("Original non-ACID files in 
transactional tables are not supported");
-                }
-
-                if (isFullAcid) {
-                    int acidVersion = 2;
-                    /**
-                     * From Hive version >= 3.0, delta/base files will always 
have file '_orc_acid_version'
-                     * with value >= '2'.
-                     */
-                    Path baseOrDeltaPath = directory.getBaseDirectory() != 
null ? directory.getBaseDirectory() :
-                            !directory.getCurrentDirectories().isEmpty() ? 
directory.getCurrentDirectories().get(0)
-                                    .getPath() : null;
-                    if (baseOrDeltaPath == null) {
-                        return Collections.emptyList();
-                    }
-                    if (!skipCheckingAcidVersionFile) {
-                        String acidVersionPath = new Path(baseOrDeltaPath, 
"_orc_acid_version").toUri().toString();
-                        RemoteFileSystem fs = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
-                                new FileSystemCache.FileSystemCacheKey(
-                                        
LocationPath.getFSIdentity(baseOrDeltaPath.toUri().toString(),
-                                                bindBrokerName),
-                                        
catalog.getCatalogProperty().getProperties(),
-                                        bindBrokerName, jobConf));
-                        Status status = fs.exists(acidVersionPath);
-                        if (status != Status.OK) {
-                            if (status.getErrCode() == ErrCode.NOT_FOUND) {
-                                acidVersion = 0;
-                            } else {
-                                throw new Exception(String.format("Failed to 
check remote path {} exists.",
-                                        acidVersionPath));
-                            }
-                        }
-                        if (acidVersion == 0 && 
!directory.getCurrentDirectories().isEmpty()) {
-                            throw new Exception(
-                                    "Hive 2.x versioned full-acid tables need 
to run major compaction.");
-                        }
-                    }
-                }
-
-                // delta directories
-                List<DeleteDeltaInfo> deleteDeltas = new ArrayList<>();
-                for (AcidUtils.ParsedDelta delta : 
directory.getCurrentDirectories()) {
-                    String location = delta.getPath().toString();
-                    RemoteFileSystem fs = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
-                            new FileSystemCache.FileSystemCacheKey(
-                                    LocationPath.getFSIdentity(location, 
bindBrokerName),
-                                            
catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf));
-                    List<RemoteFile> remoteFiles = new ArrayList<>();
-                    Status status = fs.listFiles(location, false, remoteFiles);
-                    if (status.ok()) {
-                        if (delta.isDeleteDelta()) {
-                            List<String> deleteDeltaFileNames = 
remoteFiles.stream().map(f -> f.getName()).filter(
-                                            name -> 
name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
-                                    .collect(Collectors.toList());
-                            deleteDeltas.add(new DeleteDeltaInfo(location, 
deleteDeltaFileNames));
-                            continue;
-                        }
-                        remoteFiles.stream().filter(
-                                f -> 
f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)).forEach(file -> {
-                                    LocationPath path = new 
LocationPath(file.getPath().toString(),
-                                            catalog.getProperties());
-                                    fileCacheValue.addFile(file, path);
-                                });
-                    } else {
-                        throw new RuntimeException(status.getErrMsg());
-                    }
-                }
-
-                // base
-                if (directory.getBaseDirectory() != null) {
-                    String location = directory.getBaseDirectory().toString();
-                    RemoteFileSystem fs = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
-                            new FileSystemCache.FileSystemCacheKey(
-                                    LocationPath.getFSIdentity(location, 
bindBrokerName),
-                                            
catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf));
-                    List<RemoteFile> remoteFiles = new ArrayList<>();
-                    Status status = fs.listFiles(location, false, remoteFiles);
-                    if (status.ok()) {
-                        remoteFiles.stream().filter(
-                                        f -> 
f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
-                                .forEach(file -> {
-                                    LocationPath path = new 
LocationPath(file.getPath().toString(),
-                                            catalog.getProperties());
-                                    fileCacheValue.addFile(file, path);
-                                });
-                    } else {
-                        throw new RuntimeException(status.getErrMsg());
-                    }
+                    fileCacheValues.add(AcidUtil.getAcidState(

Review Comment:
   also need ugi.doAs



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java:
##########
@@ -267,7 +266,14 @@ private void getFileSplitByPartitions(HiveMetaStoreCache 
cache, List<HivePartiti
             List<Split> allFiles, String bindBrokerName, int numBackends) 
throws IOException, UserException {
         List<FileCacheValue> fileCaches;
         if (hiveTransaction != null) {
-            fileCaches = getFileSplitByTransaction(cache, partitions, 
bindBrokerName);
+            try {
+                fileCaches = getFileSplitByTransaction(cache, partitions, 
bindBrokerName);
+            } catch (Exception e) {
+                // Release shared load (getValidWriteIds acquire Lock).
+                // If no exception is throw, the lock will be released when 
`finalizeQuery()`.
+                
Env.getCurrentHiveTransactionMgr().deregister(hiveTransaction.getQueryId());

Review Comment:
   It is not a good design to `deregister` transaction here.
   Looks like it is very error prone



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to