This is an automated email from the ASF dual-hosted git repository.

lide pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new a8a1dbebc40 [bugfix](hudi) catch exception when getting hudi partition 
(#35027) (#41342)
a8a1dbebc40 is described below

commit a8a1dbebc407fede312d894aed2847c83023a118
Author: zhangyuan <ayuanzh...@tencent.com>
AuthorDate: Tue Nov 19 19:03:37 2024 +0800

    [bugfix](hudi) catch exception when getting hudi partition (#35027) (#41342)
    
    bp https://github.com/apache/doris/pull/35027
    
    Hudi use a thread pool to get files for each partition. And use a
    countdown latch to wait all threads finish. But if the thread throw
    exception, the countdown latch will not be counted down, and thread will
    be blocked.
---
 .../doris/planner/external/hudi/HudiScanNode.java  | 122 +++++++++++----------
 1 file changed, 67 insertions(+), 55 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
index 803dadae03d..ab417a19cf7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
@@ -72,6 +72,7 @@ import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 public class HudiScanNode extends HiveScanNode {
@@ -272,73 +273,84 @@ public class HudiScanNode extends HiveScanNode {
                 
.getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog())).getExecutor();
         List<Split> splits = Collections.synchronizedList(new ArrayList<>());
         CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
+        AtomicReference<Throwable> throwable = new AtomicReference<>();
         partitions.forEach(partition -> executor.execute(() -> {
-            String globPath;
-            String partitionName = "";
-            if (partition.isDummyPartition()) {
-                globPath = hudiClient.getBasePathV2().toString() + "/*";
-            } else {
-                partitionName = 
FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
-                        new Path(partition.getPath()));
-                globPath = String.format("%s/%s/*", 
hudiClient.getBasePathV2().toString(), partitionName);
-            }
-            List<FileStatus> statuses;
             try {
-                statuses = 
FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(),
-                        new Path(globPath));
-            } catch (IOException e) {
-                throw new RuntimeException("Failed to get hudi file statuses 
on path: " + globPath, e);
-            }
-            HoodieTableFileSystemView fileSystemView = new 
HoodieTableFileSystemView(hudiClient,
-                    timeline, statuses.toArray(new FileStatus[0]));
+                String globPath;
+                String partitionName = "";
+                if (partition.isDummyPartition()) {
+                    globPath = hudiClient.getBasePathV2().toString() + "/*";
+                } else {
+                    partitionName = 
FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
+                            new Path(partition.getPath()));
+                    globPath = String.format("%s/%s/*", 
hudiClient.getBasePathV2().toString(), partitionName);
+                }
+                List<FileStatus> statuses;
+                try {
+                    statuses = 
FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(),
+                            new Path(globPath));
+                } catch (IOException e) {
+                    throw new RuntimeException("Failed to get hudi file 
statuses on path: " + globPath, e);
+                }
+                HoodieTableFileSystemView fileSystemView = new 
HoodieTableFileSystemView(hudiClient,
+                        timeline, statuses.toArray(new FileStatus[0]));
 
-            if (isCowOrRoTable) {
-                fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, 
queryInstant).forEach(baseFile -> {
-                    noLogsSplitNum.incrementAndGet();
-                    String filePath = baseFile.getPath();
-                    long fileSize = baseFile.getFileSize();
-                    // Need add hdfs host to location
-                    LocationPath locationPath = new LocationPath(filePath, 
hmsTable.getCatalogProperties());
-                    Path splitFilePath = locationPath.toScanRangeLocation();
-                    splits.add(new FileSplit(splitFilePath, 0, fileSize, 
fileSize,
-                            new String[0], partition.getPartitionValues()));
-                });
-            } else {
-                
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, 
queryInstant).forEach(fileSlice -> {
-                    Optional<HoodieBaseFile> baseFile = 
fileSlice.getBaseFile().toJavaOptional();
-                    String filePath = 
baseFile.map(BaseFile::getPath).orElse("");
-                    long fileSize = 
baseFile.map(BaseFile::getFileSize).orElse(0L);
+                if (isCowOrRoTable) {
+                    fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, 
queryInstant)
+                            .forEach(baseFile -> {
+                                noLogsSplitNum.incrementAndGet();
+                                String filePath = baseFile.getPath();
+                                long fileSize = baseFile.getFileSize();
+                                // Need add hdfs host to location
+                                LocationPath locationPath = new 
LocationPath(filePath, hmsTable.getCatalogProperties());
+                                Path splitFilePath = 
locationPath.toScanRangeLocation();
+                                splits.add(new FileSplit(splitFilePath, 0, 
fileSize, fileSize,
+                                        new String[0], 
partition.getPartitionValues()));
+                            });
+                } else {
+                    
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant)
+                            .forEach(fileSlice -> {
+                                Optional<HoodieBaseFile> baseFile = 
fileSlice.getBaseFile().toJavaOptional();
+                                String filePath = 
baseFile.map(BaseFile::getPath).orElse("");
+                                long fileSize = 
baseFile.map(BaseFile::getFileSize).orElse(0L);
 
-                    List<String> logs = 
fileSlice.getLogFiles().map(HoodieLogFile::getPath)
-                            .map(Path::toString)
-                            .collect(Collectors.toList());
-                    if (logs.isEmpty()) {
-                        noLogsSplitNum.incrementAndGet();
-                    }
+                                List<String> logs = 
fileSlice.getLogFiles().map(HoodieLogFile::getPath)
+                                        .map(Path::toString)
+                                        .collect(Collectors.toList());
+                                if (logs.isEmpty()) {
+                                    noLogsSplitNum.incrementAndGet();
+                                }
 
-                    // no base file, use log file to parse file type
-                    String agencyPath = filePath.isEmpty() ? logs.get(0) : 
filePath;
-                    HudiSplit split = new HudiSplit(new Path(agencyPath), 0, 
fileSize, fileSize,
-                            new String[0], partition.getPartitionValues());
-                    split.setTableFormatType(TableFormatType.HUDI);
-                    split.setDataFilePath(filePath);
-                    split.setHudiDeltaLogs(logs);
-                    split.setInputFormat(inputFormat);
-                    split.setSerde(serdeLib);
-                    split.setBasePath(basePath);
-                    split.setHudiColumnNames(columnNames);
-                    split.setHudiColumnTypes(columnTypes);
-                    split.setInstantTime(queryInstant);
-                    splits.add(split);
-                });
+                                // no base file, use log file to parse file 
type
+                                String agencyPath = filePath.isEmpty() ? 
logs.get(0) : filePath;
+                                HudiSplit split = new HudiSplit(new 
Path(agencyPath), 0, fileSize, fileSize,
+                                        new String[0], 
partition.getPartitionValues());
+                                split.setTableFormatType(TableFormatType.HUDI);
+                                split.setDataFilePath(filePath);
+                                split.setHudiDeltaLogs(logs);
+                                split.setInputFormat(inputFormat);
+                                split.setSerde(serdeLib);
+                                split.setBasePath(basePath);
+                                split.setHudiColumnNames(columnNames);
+                                split.setHudiColumnTypes(columnTypes);
+                                split.setInstantTime(queryInstant);
+                                splits.add(split);
+                            });
+                }
+            } catch (Throwable t) {
+                throwable.set(t);
+            } finally {
+                countDownLatch.countDown();
             }
-            countDownLatch.countDown();
         }));
         try {
             countDownLatch.await();
         } catch (InterruptedException e) {
             throw new RuntimeException(e.getMessage(), e);
         }
+        if (throwable.get() != null) {
+            throw new RuntimeException(throwable.get().getMessage(), 
throwable.get());
+        }
         return splits;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to