morningman commented on code in PR #44001: URL: https://github.com/apache/doris/pull/44001#discussion_r1888191170
########## fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java: ########## @@ -742,121 +738,37 @@ public LoadingCache<PartitionCacheKey, HivePartition> getPartitionCache() { return partitionCache; } - public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions, ValidWriteIdList validWriteIds, - boolean isFullAcid, boolean skipCheckingAcidVersionFile, long tableId, String bindBrokerName) { + public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions, Map<String, String> txnValidIds, + boolean isFullAcid, String bindBrokerName) { List<FileCacheValue> fileCacheValues = Lists.newArrayList(); String remoteUser = jobConf.get(AuthenticationConfig.HADOOP_USER_NAME); + try { + if (partitions.isEmpty()) { + return fileCacheValues; + } + for (HivePartition partition : partitions) { - FileCacheValue fileCacheValue = new FileCacheValue(); - AcidUtils.Directory directory; + //Get filesystem multiple times, reason: https://github.com/apache/doris/pull/23409. + RemoteFileSystem fileSystem = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( + new FileSystemCache.FileSystemCacheKey( + LocationPath.getFSIdentity(partition.getPath(), bindBrokerName), + catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf)); + if (!Strings.isNullOrEmpty(remoteUser)) { UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser); - directory = ugi.doAs((PrivilegedExceptionAction<AcidUtils.Directory>) () -> AcidUtils.getAcidState( - new Path(partition.getPath()), jobConf, validWriteIds, false, true)); + fileCacheValues.add( + ugi.doAs((PrivilegedExceptionAction<FileCacheValue>) () -> AcidUtil.getAcidState( + fileSystem, partition, txnValidIds, catalog.getProperties())) + ); } else { - directory = AcidUtils.getAcidState(new Path(partition.getPath()), jobConf, validWriteIds, false, - true); - } - if (directory == null) { - return Collections.emptyList(); - } - if (!directory.getOriginalFiles().isEmpty()) { - throw new Exception("Original non-ACID files in transactional tables are not supported"); - } - - if (isFullAcid) { - int acidVersion = 2; - /** - * From Hive version >= 3.0, delta/base files will always have file '_orc_acid_version' - * with value >= '2'. - */ - Path baseOrDeltaPath = directory.getBaseDirectory() != null ? directory.getBaseDirectory() : - !directory.getCurrentDirectories().isEmpty() ? directory.getCurrentDirectories().get(0) - .getPath() : null; - if (baseOrDeltaPath == null) { - return Collections.emptyList(); - } - if (!skipCheckingAcidVersionFile) { - String acidVersionPath = new Path(baseOrDeltaPath, "_orc_acid_version").toUri().toString(); - RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey( - LocationPath.getFSIdentity(baseOrDeltaPath.toUri().toString(), - bindBrokerName), - catalog.getCatalogProperty().getProperties(), - bindBrokerName, jobConf)); - Status status = fs.exists(acidVersionPath); - if (status != Status.OK) { - if (status.getErrCode() == ErrCode.NOT_FOUND) { - acidVersion = 0; - } else { - throw new Exception(String.format("Failed to check remote path {} exists.", - acidVersionPath)); - } - } - if (acidVersion == 0 && !directory.getCurrentDirectories().isEmpty()) { - throw new Exception( - "Hive 2.x versioned full-acid tables need to run major compaction."); - } - } - } - - // delta directories - List<DeleteDeltaInfo> deleteDeltas = new ArrayList<>(); - for (AcidUtils.ParsedDelta delta : directory.getCurrentDirectories()) { - String location = delta.getPath().toString(); - RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey( - LocationPath.getFSIdentity(location, bindBrokerName), - catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf)); - List<RemoteFile> remoteFiles = new ArrayList<>(); - Status status = fs.listFiles(location, false, remoteFiles); - if (status.ok()) { - if (delta.isDeleteDelta()) { - List<String> deleteDeltaFileNames = remoteFiles.stream().map(f -> f.getName()).filter( - name -> name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) - .collect(Collectors.toList()); - deleteDeltas.add(new DeleteDeltaInfo(location, deleteDeltaFileNames)); - continue; - } - remoteFiles.stream().filter( - f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)).forEach(file -> { - LocationPath path = new LocationPath(file.getPath().toString(), - catalog.getProperties()); - fileCacheValue.addFile(file, path); - }); - } else { - throw new RuntimeException(status.getErrMsg()); - } - } - - // base - if (directory.getBaseDirectory() != null) { - String location = directory.getBaseDirectory().toString(); - RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey( - LocationPath.getFSIdentity(location, bindBrokerName), - catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf)); - List<RemoteFile> remoteFiles = new ArrayList<>(); - Status status = fs.listFiles(location, false, remoteFiles); - if (status.ok()) { - remoteFiles.stream().filter( - f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) - .forEach(file -> { - LocationPath path = new LocationPath(file.getPath().toString(), - catalog.getProperties()); - fileCacheValue.addFile(file, path); - }); - } else { - throw new RuntimeException(status.getErrMsg()); - } + fileCacheValues.add(AcidUtil.getAcidState( Review Comment: also need ugi.doAs ########## fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java: ########## @@ -267,7 +266,14 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List<HivePartiti List<Split> allFiles, String bindBrokerName, int numBackends) throws IOException, UserException { List<FileCacheValue> fileCaches; if (hiveTransaction != null) { - fileCaches = getFileSplitByTransaction(cache, partitions, bindBrokerName); + try { + fileCaches = getFileSplitByTransaction(cache, partitions, bindBrokerName); + } catch (Exception e) { + // Release shared load (getValidWriteIds acquire Lock). + // If no exception is throw, the lock will be released when `finalizeQuery()`. + Env.getCurrentHiveTransactionMgr().deregister(hiveTransaction.getQueryId()); Review Comment: It is not a good design to `deregister` transaction here. Looks like it is very error prone -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org