morningman commented on code in PR #51185:
URL: https://github.com/apache/doris/pull/51185#discussion_r2113607866


##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java:
##########
@@ -229,19 +231,21 @@ public void startSplit(int numBackends) throws 
UserException {
     public void doStartSplit() {
         TableScan scan = createTableScan();
         CompletableFuture.runAsync(() -> {
+            AtomicReference<CloseableIterable<FileScanTask>> taskRef = new 
AtomicReference<>();
             try {
                 preExecutionAuthenticator.execute(
                         () -> {
                             CloseableIterable<FileScanTask> fileScanTasks = 
planFileScanTask(scan);
-
-                            // 1. this task should stop when all splits are 
assigned
-                            // 2. if we want to stop this plan, we can close 
the fileScanTasks to stop
-                            splitAssignment.addCloseable(fileScanTasks);
-
-                            fileScanTasks.forEach(fileScanTask ->
-                                    
splitAssignment.addToQueue(Lists.newArrayList(createIcebergSplit(fileScanTask))));
-
-                            return null;
+                            taskRef.set(planFileScanTask(scan));

Review Comment:
   BTW, the `threadPoolWithPreAuth` in `ExternalCatalog.java` is only used in 



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java:
##########
@@ -229,19 +231,21 @@ public void startSplit(int numBackends) throws 
UserException {
     public void doStartSplit() {
         TableScan scan = createTableScan();
         CompletableFuture.runAsync(() -> {
+            AtomicReference<CloseableIterable<FileScanTask>> taskRef = new 
AtomicReference<>();
             try {
                 preExecutionAuthenticator.execute(
                         () -> {
                             CloseableIterable<FileScanTask> fileScanTasks = 
planFileScanTask(scan);
-
-                            // 1. this task should stop when all splits are 
assigned
-                            // 2. if we want to stop this plan, we can close 
the fileScanTasks to stop
-                            splitAssignment.addCloseable(fileScanTasks);
-
-                            fileScanTasks.forEach(fileScanTask ->
-                                    
splitAssignment.addToQueue(Lists.newArrayList(createIcebergSplit(fileScanTask))));
-
-                            return null;
+                            taskRef.set(planFileScanTask(scan));

Review Comment:
   You call `planFileScanTask()` twice?



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java:
##########
@@ -100,10 +108,22 @@ private void appendBatch(Multimap<Backend, Split> batch) 
throws UserException {
             for (Split split : splits) {
                 locations.add(splitToScanRange.getScanRange(backend, 
locationProperties, split, pathPartitionKeys));
             }
-            try {
-                assignment.computeIfAbsent(backend, be -> new 
LinkedBlockingQueue<>(10000)).put(locations);
-            } catch (Exception e) {
-                throw new UserException("Failed to offer batch split", e);
+            while (true) {
+                BlockingQueue<Collection<TScanRangeLocations>> queue =
+                        assignment.computeIfAbsent(backend, be -> new 
LinkedBlockingQueue<>(10000));
+                try {
+                    if (queue.offer(locations, 100, TimeUnit.MILLISECONDS)) {
+                        return;
+                    }
+                } catch (InterruptedException e) {
+                    throw new UserException("Failed to offer batch split by 
interrupted", e);
+                }
+                if (needMoreSplit()) {

Review Comment:
   This logic is wrong



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to