This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f6d6041e417 [fix](iceberg) Fix some issues in batch mode (#51185)
f6d6041e417 is described below

commit f6d6041e417c915a71758c780230a31fd5959def
Author: wuwenchi <[email protected]>
AuthorDate: Mon Jun 2 23:14:06 2025 +0800

    [fix](iceberg) Fix some issues in batch mode (#51185)
    
    ### What problem does this PR solve?
    
    Problem Summary:
    
    1. When adding splits to the queue, a retry mechanism is added to
    prevent blocking and sticking.
    2. We use a separate thread to fetch splits. When the SQL execution ends
    early (such as `limit`), this thread needs to be handled. Now it is
    exited by throwing an exception. Moreover, this thread should use an
    independent thread pool to prevent system freezes caused by resource
    contention.
    3. When there are multiple exceptions, overwriting would occur
    previously. Now we use `addSuppressed` to prevent exception loss.
    4. When initializing the split assignment, a timeout mechanism is added
    to prevent the situation where the system gets stuck due to failing to
    obtain splits.
    5. When the split assignment stop, if an exception is not null, an
    exception should be thrown externally.
---
 .../doris/datasource/ExternalMetaCacheMgr.java     |   1 +
 .../apache/doris/datasource/SplitAssignment.java   |  58 ++-
 .../doris/datasource/hive/HMSExternalCatalog.java  |  10 +-
 .../doris/datasource/hive/source/HiveScanNode.java |   4 +-
 .../doris/datasource/hudi/source/HudiScanNode.java |  12 +-
 .../datasource/iceberg/source/IcebergScanNode.java |  32 +-
 .../maxcompute/source/MaxComputeScanNode.java      |   8 +-
 .../doris/datasource/SplitAssignmentTest.java      | 427 +++++++++++++++++++++
 8 files changed, 508 insertions(+), 44 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
index 28115303dff..990c825927e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
@@ -84,6 +84,7 @@ public class ExternalMetaCacheMgr {
     private ExecutorService rowCountRefreshExecutor;
     private ExecutorService commonRefreshExecutor;
     private ExecutorService fileListingExecutor;
+    // This executor is used to schedule the getting split tasks
     private ExecutorService scheduleExecutor;
 
     // catalog id -> HiveMetaStoreCache
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
index 2d3557a8310..2f4aa7a0fe1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
@@ -36,6 +36,7 @@ import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -76,12 +77,20 @@ public class SplitAssignment {
     public void init() throws UserException {
         splitGenerator.startSplit(backendPolicy.numBackends());
         synchronized (assignLock) {
-            while (sampleSplit == null && waitFirstSplit()) {
+            final int waitIntervalTimeMillis = 100;
+            final int initTimeoutMillis = 30000; // 30s
+            int waitTotalTime = 0;
+            while (sampleSplit == null && needMoreSplit()) {
                 try {
-                    assignLock.wait(100);
+                    assignLock.wait(waitIntervalTimeMillis);
                 } catch (InterruptedException e) {
                     throw new UserException(e.getMessage(), e);
                 }
+                waitTotalTime += waitIntervalTimeMillis;
+                if (waitTotalTime > initTimeoutMillis) {
+                    throw new UserException("Failed to get first split after 
waiting for "
+                            + (waitTotalTime / 1000) + " seconds.");
+                }
             }
         }
         if (exception != null) {
@@ -89,7 +98,7 @@ public class SplitAssignment {
         }
     }
 
-    private boolean waitFirstSplit() {
+    public boolean needMoreSplit() {
         return !scheduleFinished.get() && !isStopped.get() && exception == 
null;
     }
 
@@ -100,10 +109,16 @@ public class SplitAssignment {
             for (Split split : splits) {
                 locations.add(splitToScanRange.getScanRange(backend, 
locationProperties, split, pathPartitionKeys));
             }
-            try {
-                assignment.computeIfAbsent(backend, be -> new 
LinkedBlockingQueue<>(10000)).put(locations);
-            } catch (Exception e) {
-                throw new UserException("Failed to offer batch split", e);
+            while (needMoreSplit()) {
+                BlockingQueue<Collection<TScanRangeLocations>> queue =
+                        assignment.computeIfAbsent(backend, be -> new 
LinkedBlockingQueue<>(10000));
+                try {
+                    if (queue.offer(locations, 100, TimeUnit.MILLISECONDS)) {
+                        return;
+                    }
+                } catch (InterruptedException e) {
+                    addUserException(new UserException("Failed to offer batch 
split by interrupted", e));
+                }
             }
         }
     }
@@ -120,7 +135,7 @@ public class SplitAssignment {
         return sampleSplit;
     }
 
-    public void addToQueue(List<Split> splits) {
+    public void addToQueue(List<Split> splits) throws UserException {
         if (splits.isEmpty()) {
             return;
         }
@@ -130,19 +145,9 @@ public class SplitAssignment {
                 sampleSplit = splits.get(0);
                 assignLock.notify();
             }
-            try {
-                batch = backendPolicy.computeScanRangeAssignment(splits);
-            } catch (UserException e) {
-                exception = e;
-            }
-        }
-        if (batch != null) {
-            try {
-                appendBatch(batch);
-            } catch (UserException e) {
-                exception = e;
-            }
+            batch = backendPolicy.computeScanRangeAssignment(splits);
         }
+        appendBatch(batch);
     }
 
     private void notifyAssignment() {
@@ -164,10 +169,18 @@ public class SplitAssignment {
     }
 
     public void setException(UserException e) {
-        exception = e;
+        addUserException(e);
         notifyAssignment();
     }
 
+    private void addUserException(UserException e) {
+        if (exception != null) {
+            exception.addSuppressed(e);
+        } else {
+            exception = e;
+        }
+    }
+
     public void finishSchedule() {
         scheduleFinished.set(true);
         notifyAssignment();
@@ -187,6 +200,9 @@ public class SplitAssignment {
             }
         });
         notifyAssignment();
+        if (exception != null) {
+            throw new RuntimeException(exception);
+        }
     }
 
     public boolean isStop() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
index b9896c89972..88a310d5f1d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
@@ -181,11 +181,11 @@ public class HMSExternalCatalog extends ExternalCatalog {
         }
         HiveMetadataOps hiveOps = 
ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this);
         threadPoolWithPreAuth = 
ThreadPoolManager.newDaemonFixedThreadPoolWithPreAuth(
-            ICEBERG_CATALOG_EXECUTOR_THREAD_NUM,
-            Integer.MAX_VALUE,
-            String.format("hms_iceberg_catalog_%s_executor_pool", name),
-            true,
-            preExecutionAuthenticator);
+                ICEBERG_CATALOG_EXECUTOR_THREAD_NUM,
+                Integer.MAX_VALUE,
+                String.format("hms_iceberg_catalog_%s_executor_pool", name),
+                true,
+                preExecutionAuthenticator);
         FileSystemProvider fileSystemProvider = new 
FileSystemProviderImpl(Env.getCurrentEnv().getExtMetaCacheMgr(),
                 this.bindBrokerName(), 
this.catalogProperty.getHadoopProperties());
         this.fileSystemExecutor = 
ThreadPoolManager.newDaemonFixedThreadPool(FILE_SYSTEM_EXECUTOR_THREAD_NUM,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index 5bcc65bb81f..09a031ce325 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -225,7 +225,9 @@ public class HiveScanNode extends FileQueryScanNode {
                             if (allFiles.size() > numSplitsPerPartition.get()) 
{
                                 numSplitsPerPartition.set(allFiles.size());
                             }
-                            splitAssignment.addToQueue(allFiles);
+                            if (splitAssignment.needMoreSplit()) {
+                                splitAssignment.addToQueue(allFiles);
+                            }
                         } catch (Exception e) {
                             batchException.set(new 
UserException(e.getMessage(), e));
                         } finally {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index 11d1a19b1ee..330a92bfa1f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -77,6 +77,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -417,6 +418,7 @@ public class HudiScanNode extends HiveScanNode {
             return;
         }
         AtomicInteger numFinishedPartitions = new AtomicInteger(0);
+        ExecutorService scheduleExecutor = 
Env.getCurrentEnv().getExtMetaCacheMgr().getScheduleExecutor();
         CompletableFuture.runAsync(() -> {
             for (HivePartition partition : prunedPartitions) {
                 if (batchException.get() != null || splitAssignment.isStop()) {
@@ -435,8 +437,10 @@ public class HudiScanNode extends HiveScanNode {
                         if (allFiles.size() > numSplitsPerPartition.get()) {
                             numSplitsPerPartition.set(allFiles.size());
                         }
-                        splitAssignment.addToQueue(allFiles);
-                    } catch (IOException e) {
+                        if (splitAssignment.needMoreSplit()) {
+                            splitAssignment.addToQueue(allFiles);
+                        }
+                    } catch (Exception e) {
                         batchException.set(new UserException(e.getMessage(), 
e));
                     } finally {
                         splittersOnFlight.release();
@@ -447,12 +451,12 @@ public class HudiScanNode extends HiveScanNode {
                             splitAssignment.finishSchedule();
                         }
                     }
-                });
+                }, scheduleExecutor);
             }
             if (batchException.get() != null) {
                 splitAssignment.setException(batchException.get());
             }
-        });
+        }, scheduleExecutor);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 8707f4b06b5..34c1b5aa193 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -22,6 +22,7 @@ import org.apache.doris.analysis.FunctionCallExpr;
 import org.apache.doris.analysis.TableSnapshot;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
@@ -78,6 +79,7 @@ import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class IcebergScanNode extends FileQueryScanNode {
 
@@ -229,19 +231,21 @@ public class IcebergScanNode extends FileQueryScanNode {
     public void doStartSplit() {
         TableScan scan = createTableScan();
         CompletableFuture.runAsync(() -> {
+            AtomicReference<CloseableIterable<FileScanTask>> taskRef = new 
AtomicReference<>();
             try {
                 preExecutionAuthenticator.execute(
                         () -> {
                             CloseableIterable<FileScanTask> fileScanTasks = 
planFileScanTask(scan);
-
-                            // 1. this task should stop when all splits are 
assigned
-                            // 2. if we want to stop this plan, we can close 
the fileScanTasks to stop
-                            splitAssignment.addCloseable(fileScanTasks);
-
-                            fileScanTasks.forEach(fileScanTask ->
-                                    
splitAssignment.addToQueue(Lists.newArrayList(createIcebergSplit(fileScanTask))));
-
-                            return null;
+                            taskRef.set(fileScanTasks);
+
+                            CloseableIterator<FileScanTask> iterator = 
fileScanTasks.iterator();
+                            while (splitAssignment.needMoreSplit() && 
iterator.hasNext()) {
+                                try {
+                                    
splitAssignment.addToQueue(Lists.newArrayList(createIcebergSplit(iterator.next())));
+                                } catch (UserException e) {
+                                    throw new RuntimeException(e);
+                                }
+                            }
                         }
                 );
                 splitAssignment.finishSchedule();
@@ -252,8 +256,16 @@ public class IcebergScanNode extends FileQueryScanNode {
                 } else {
                     splitAssignment.setException(new 
UserException(e.getMessage(), e));
                 }
+            } finally {
+                if (taskRef.get() != null) {
+                    try {
+                        taskRef.get().close();
+                    } catch (IOException e) {
+                        // ignore
+                    }
+                }
             }
-        });
+        }, Env.getCurrentEnv().getExtMetaCacheMgr().getScheduleExecutor());
     }
 
     @VisibleForTesting
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
index 063aeea68eb..632d88428fe 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
@@ -267,8 +267,10 @@ public class MaxComputeScanNode extends FileQueryScanNode {
                                     
createTableBatchReadSession(requiredBatchPartitionSpecs);
                             List<Split> batchSplit = 
getSplitByTableSession(tableBatchReadSession);
 
-                            splitAssignment.addToQueue(batchSplit);
-                        } catch (IOException e) {
+                            if (splitAssignment.needMoreSplit()) {
+                                splitAssignment.addToQueue(batchSplit);
+                            }
+                        } catch (Exception e) {
                             batchException.set(new 
UserException(e.getMessage(), e));
                         } finally {
                             if (batchException.get() != null) {
@@ -288,7 +290,7 @@ public class MaxComputeScanNode extends FileQueryScanNode {
                     splitAssignment.setException(batchException.get());
                 }
             }
-        });
+        }, scheduleExecutor);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/SplitAssignmentTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/SplitAssignmentTest.java
new file mode 100644
index 00000000000..ab5205b47a7
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/SplitAssignmentTest.java
@@ -0,0 +1,427 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.common.UserException;
+import org.apache.doris.spi.Split;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TScanRangeLocations;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.MockUp;
+import mockit.Mocked;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class SplitAssignmentTest {
+
+    @Injectable
+    private FederationBackendPolicy mockBackendPolicy;
+
+    @Injectable
+    private SplitGenerator mockSplitGenerator;
+
+    @Injectable
+    private SplitToScanRange mockSplitToScanRange;
+
+    @Mocked
+    private Split mockSplit;
+
+    @Mocked
+    private Backend mockBackend;
+
+    @Mocked
+    private TScanRangeLocations mockScanRangeLocations;
+
+    private SplitAssignment splitAssignment;
+    private Map<String, String> locationProperties;
+    private List<String> pathPartitionKeys;
+
+    @BeforeEach
+    void setUp() {
+        locationProperties = new HashMap<>();
+        pathPartitionKeys = new ArrayList<>();
+
+        splitAssignment = new SplitAssignment(
+                mockBackendPolicy,
+                mockSplitGenerator,
+                mockSplitToScanRange,
+                locationProperties,
+                pathPartitionKeys
+        );
+    }
+
+    // ==================== init() method tests ====================
+
+    @Test
+    void testInitSuccess() throws Exception {
+        Multimap<Backend, Split> batch = ArrayListMultimap.create();
+        batch.put(mockBackend, mockSplit);
+
+        new Expectations() {
+            {
+                mockBackendPolicy.computeScanRangeAssignment((List<Split>) 
any);
+                result = batch;
+
+                mockSplitToScanRange.getScanRange(mockBackend, 
locationProperties, mockSplit, pathPartitionKeys);
+                result = mockScanRangeLocations;
+            }
+        };
+
+        // Start a thread to simulate split generation after a short delay
+        Thread splitGeneratorThread = new Thread(() -> {
+            try {
+                Thread.sleep(50); // Short delay to simulate async split 
generation
+                List<Split> splits = Collections.singletonList(mockSplit);
+                splitAssignment.addToQueue(splits);
+            } catch (Exception e) {
+                // Ignore for test
+            }
+        });
+
+        splitGeneratorThread.start();
+
+        // Test
+        Assertions.assertDoesNotThrow(() -> splitAssignment.init());
+
+        // Verify sample split is set
+        Assertions.assertNotNull(splitAssignment.getSampleSplit());
+
+        splitGeneratorThread.join(1000); // Wait for thread to complete
+    }
+
+    @Test
+    void testInitTimeout() throws Exception {
+        // Use MockUp to simulate timeout behavior quickly instead of waiting 
30 seconds
+        SplitAssignment testAssignment = new SplitAssignment(
+                mockBackendPolicy,
+                mockSplitGenerator,
+                mockSplitToScanRange,
+                locationProperties,
+                pathPartitionKeys
+        );
+
+        new MockUp<SplitAssignment>() {
+            @mockit.Mock
+            public void init() throws UserException {
+                // Directly throw timeout exception to simulate the timeout 
scenario quickly
+                throw new UserException("Failed to get first split after 
waiting for 0 seconds.");
+            }
+        };
+
+        // Test & Verify - should timeout immediately now
+        UserException exception = Assertions.assertThrows(UserException.class, 
() -> testAssignment.init());
+        Assertions.assertTrue(exception.getMessage().contains("Failed to get 
first split after waiting for"));
+    }
+
+    @Test
+    void testInitInterrupted() throws Exception {
+        CountDownLatch initStarted = new CountDownLatch(1);
+        CountDownLatch shouldInterrupt = new CountDownLatch(1);
+
+        Thread initThread = new Thread(() -> {
+            try {
+                initStarted.countDown();
+                shouldInterrupt.await();
+                splitAssignment.init();
+            } catch (Exception e) {
+                // Expected interruption
+            }
+        });
+
+        initThread.start();
+        initStarted.await();
+
+        // Interrupt the init thread
+        initThread.interrupt();
+        shouldInterrupt.countDown();
+
+        initThread.join(1000);
+    }
+
+    @Test
+    void testInitWithPreExistingException() throws Exception {
+        UserException preException = new UserException("Pre-existing error");
+        splitAssignment.setException(preException);
+
+        // Test & Verify
+        UserException exception = Assertions.assertThrows(UserException.class, 
() -> splitAssignment.init());
+        Assertions.assertTrue(exception.getMessage().contains(" Pre-existing 
error"), exception.getMessage());
+    }
+
+    // ==================== addToQueue() method tests ====================
+
+    @Test
+    void testAddToQueueWithEmptyList() throws Exception {
+        // Test
+        Assertions.assertDoesNotThrow(() -> 
splitAssignment.addToQueue(Collections.emptyList()));
+    }
+
+    @Test
+    void testAddToQueueSuccess() throws Exception {
+        Multimap<Backend, Split> batch = ArrayListMultimap.create();
+        batch.put(mockBackend, mockSplit);
+
+        new Expectations() {
+            {
+                mockBackendPolicy.computeScanRangeAssignment((List<Split>) 
any);
+                result = batch;
+
+                mockSplitToScanRange.getScanRange(mockBackend, 
locationProperties, mockSplit, pathPartitionKeys);
+                result = mockScanRangeLocations;
+            }
+        };
+
+        // Mock setup
+        List<Split> splits = Collections.singletonList(mockSplit);
+
+        // Test
+        Assertions.assertDoesNotThrow(() -> 
splitAssignment.addToQueue(splits));
+
+        // Verify sample split is set
+        Assertions.assertEquals(mockSplit, splitAssignment.getSampleSplit());
+
+        // Verify assignment queue is created and contains data
+        BlockingQueue<Collection<TScanRangeLocations>> queue = 
splitAssignment.getAssignedSplits(mockBackend);
+        Assertions.assertNotNull(queue);
+    }
+
+    @Test
+    void testAddToQueueSampleSplitAlreadySet() throws Exception {
+        Multimap<Backend, Split> batch = ArrayListMultimap.create();
+        batch.put(mockBackend, mockSplit);
+
+        new Expectations() {
+            {
+                mockBackendPolicy.computeScanRangeAssignment((List<Split>) 
any);
+                result = batch;
+                minTimes = 0;
+
+                mockSplitToScanRange.getScanRange(mockBackend, 
locationProperties, mockSplit, pathPartitionKeys);
+                result = mockScanRangeLocations;
+                minTimes = 0;
+            }
+        };
+
+        // Setup: First call to set sample split
+        List<Split> firstSplits = Collections.singletonList(mockSplit);
+
+        splitAssignment.addToQueue(firstSplits);
+        Split firstSampleSplit = splitAssignment.getSampleSplit();
+
+        // Test: Second call should not change sample split
+        List<Split> secondSplits = Collections.singletonList(mockSplit);
+
+        splitAssignment.addToQueue(secondSplits);
+
+        // Verify sample split unchanged
+        Assertions.assertEquals(firstSampleSplit, 
splitAssignment.getSampleSplit());
+    }
+
+    @Test
+    void testAddToQueueWithQueueBlockingScenario() throws Exception {
+        Multimap<Backend, Split> batch = ArrayListMultimap.create();
+        batch.put(mockBackend, mockSplit);
+
+        new Expectations() {
+            {
+                mockBackendPolicy.computeScanRangeAssignment((List<Split>) 
any);
+                result = batch;
+
+                mockSplitToScanRange.getScanRange(mockBackend, 
locationProperties, mockSplit, pathPartitionKeys);
+                result = mockScanRangeLocations;
+            }
+        };
+
+        // This test simulates a scenario where appendBatch might experience 
queue blocking
+        // by adding multiple batches rapidly
+
+        List<Split> splits = Collections.singletonList(mockSplit);
+
+        // First, fill up the queue by adding many batches
+        for (int i = 0; i < 10; i++) {
+            splitAssignment.addToQueue(splits);
+        }
+
+        // Verify the queue has data
+        BlockingQueue<Collection<TScanRangeLocations>> queue = 
splitAssignment.getAssignedSplits(mockBackend);
+        Assertions.assertNotNull(queue);
+    }
+
+    @Test
+    void testAddToQueueConcurrentAccess() throws Exception {
+        Multimap<Backend, Split> batch = ArrayListMultimap.create();
+        batch.put(mockBackend, mockSplit);
+
+        new Expectations() {
+            {
+                mockBackendPolicy.computeScanRangeAssignment((List<Split>) 
any);
+                result = batch;
+
+                mockSplitToScanRange.getScanRange(mockBackend, 
locationProperties, mockSplit, pathPartitionKeys);
+                result = mockScanRangeLocations;
+            }
+        };
+
+        // Test concurrent access to addToQueue method
+        List<Split> splits = Collections.singletonList(mockSplit);
+
+        int threadCount = 5;
+        CountDownLatch startLatch = new CountDownLatch(1);
+        CountDownLatch doneLatch = new CountDownLatch(threadCount);
+
+        List<Thread> threads = new ArrayList<>();
+        for (int i = 0; i < threadCount; i++) {
+            Thread thread = new Thread(() -> {
+                try {
+                    startLatch.await();
+                    splitAssignment.addToQueue(splits);
+                } catch (Exception e) {
+                    // Log but don't fail test for concurrency issues
+                } finally {
+                    doneLatch.countDown();
+                }
+            });
+            threads.add(thread);
+            thread.start();
+        }
+
+        startLatch.countDown(); // Start all threads
+        Assertions.assertTrue(doneLatch.await(5, TimeUnit.SECONDS)); // Wait 
for completion
+
+        // Verify sample split is set
+        Assertions.assertNotNull(splitAssignment.getSampleSplit());
+
+        // Cleanup
+        for (Thread thread : threads) {
+            thread.join(1000);
+        }
+    }
+
+    // ==================== Integration tests for init() and addToQueue() 
====================
+
+    @Test
+    void testInitAndAddToQueueIntegration() throws Exception {
+        new Expectations() {
+            {
+                Multimap<Backend, Split> batch = ArrayListMultimap.create();
+                batch.put(mockBackend, mockSplit);
+
+                mockBackendPolicy.computeScanRangeAssignment((List<Split>) 
any);
+                result = batch;
+
+                mockSplitToScanRange.getScanRange(mockBackend, 
locationProperties, mockSplit, pathPartitionKeys);
+                result = mockScanRangeLocations;
+            }
+        };
+
+        List<Split> splits = Collections.singletonList(mockSplit);
+
+        // Start background thread to add splits after init starts
+        Thread splitsProvider = new Thread(() -> {
+            try {
+                Thread.sleep(100); // Small delay to ensure init is waiting
+                splitAssignment.addToQueue(splits);
+            } catch (Exception e) {
+                // Ignore
+            }
+        });
+
+        splitsProvider.start();
+
+        // Test init - should succeed once splits are added
+        Assertions.assertDoesNotThrow(() -> splitAssignment.init());
+
+        // Verify
+        Assertions.assertNotNull(splitAssignment.getSampleSplit());
+        Assertions.assertEquals(mockSplit, splitAssignment.getSampleSplit());
+
+        BlockingQueue<Collection<TScanRangeLocations>> queue = 
splitAssignment.getAssignedSplits(mockBackend);
+        Assertions.assertNotNull(queue);
+
+        splitsProvider.join(1000);
+    }
+
+    // ==================== appendBatch() behavior tests ====================
+
+    @Test
+    void testAppendBatchTimeoutBehavior() throws Exception {
+        new Expectations() {
+            {
+                Multimap<Backend, Split> batch = ArrayListMultimap.create();
+                batch.put(mockBackend, mockSplit);
+
+                mockBackendPolicy.computeScanRangeAssignment((List<Split>) 
any);
+                result = batch;
+
+                mockSplitToScanRange.getScanRange(mockBackend, 
locationProperties, mockSplit, pathPartitionKeys);
+                result = mockScanRangeLocations;
+            }
+        };
+
+        // This test verifies that appendBatch properly handles queue offer 
timeouts
+        // We'll simulate this by first filling the assignment and then trying 
to add more
+
+        List<Split> splits = Collections.singletonList(mockSplit);
+
+        // Add multiple splits to potentially cause queue pressure
+        for (int i = 0; i < 50; i++) {
+            try {
+                splitAssignment.addToQueue(splits);
+            } catch (Exception e) {
+                // Expected if queue gets full and times out
+                break;
+            }
+        }
+
+        // Verify that splits were processed
+        Assertions.assertNotNull(splitAssignment.getSampleSplit());
+    }
+
+    @Test
+    void testInitWhenNeedMoreSplitReturnsFalse() throws Exception {
+        // Test init behavior when needMoreSplit() returns false
+        splitAssignment.stop(); // This should make needMoreSplit() return 
false
+
+        // Init should complete immediately without waiting
+        Assertions.assertDoesNotThrow(() -> splitAssignment.init());
+    }
+
+    @Test
+    void testInitWithScheduleFinished() throws Exception {
+        // Test init behavior when schedule is already finished
+        splitAssignment.finishSchedule();
+
+        // Init should complete immediately without waiting
+        Assertions.assertDoesNotThrow(() -> splitAssignment.init());
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to