This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 0ad3b1b5eca17d604546957ff71510f1dc947ac7 Author: wuwenchi <wuwenchi...@hotmail.com> AuthorDate: Fri Sep 13 11:05:11 2024 +0800 [bugfix](hive)Handle exceptions when submitting tasks fails to prevent dead loop (#40708) ## Proposed changes When the thread pool is full, an exception will be thrown and we need to handle this exception --- .../doris/datasource/hive/source/HiveScanNode.java | 44 +++++++++++----------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index be722b31c7b..e09161fcf74 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -260,30 +260,32 @@ public class HiveScanNode extends FileQueryScanNode { } try { splittersOnFlight.acquire(); - } catch (InterruptedException e) { + CompletableFuture.runAsync(() -> { + try { + List<Split> allFiles = Lists.newArrayList(); + getFileSplitByPartitions( + cache, Collections.singletonList(partition), allFiles, bindBrokerName); + if (allFiles.size() > numSplitsPerPartition.get()) { + numSplitsPerPartition.set(allFiles.size()); + } + splitAssignment.addToQueue(allFiles); + } catch (IOException e) { + batchException.set(new UserException(e.getMessage(), e)); + } finally { + splittersOnFlight.release(); + if (batchException.get() != null) { + splitAssignment.setException(batchException.get()); + } + if (numFinishedPartitions.incrementAndGet() == prunedPartitions.size()) { + splitAssignment.finishSchedule(); + } + } + }, scheduleExecutor); + } catch (Exception e) { + // When submitting a task, an exception will be thrown if the task pool(scheduleExecutor) is full batchException.set(new UserException(e.getMessage(), e)); break; } - CompletableFuture.runAsync(() -> { - try { - List<Split> allFiles = Lists.newArrayList(); - getFileSplitByPartitions(cache, Collections.singletonList(partition), allFiles, bindBrokerName); - if (allFiles.size() > numSplitsPerPartition.get()) { - numSplitsPerPartition.set(allFiles.size()); - } - splitAssignment.addToQueue(allFiles); - } catch (IOException e) { - batchException.set(new UserException(e.getMessage(), e)); - } finally { - splittersOnFlight.release(); - if (batchException.get() != null) { - splitAssignment.setException(batchException.get()); - } - if (numFinishedPartitions.incrementAndGet() == prunedPartitions.size()) { - splitAssignment.finishSchedule(); - } - } - }, scheduleExecutor); } if (batchException.get() != null) { splitAssignment.setException(batchException.get()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org