morningman commented on code in PR #45148:
URL: https://github.com/apache/doris/pull/45148#discussion_r1874352612


##########
fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java:
##########
@@ -156,7 +156,7 @@ public BlockingQueue<Collection<TScanRangeLocations>> 
getAssignedSplits(Backend
         return splits;
     }
 
-    public void setException(UserException e) {
+    public synchronized void setException(UserException e) {

Review Comment:
   Why using `synchronized `? add comment in code



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java:
##########
@@ -181,32 +174,118 @@ boolean createTableBatchReadSession() throws 
UserException {
                 orderedRequiredDataColumns.add(columnName);
             }
         }
+    }
 
+    /**
+     * For no partition table: request requiredPartitionSpecs is empty
+     * For partition table: if requiredPartitionSpecs is empty, get all 
partition data.
+     */
+    TableBatchReadSession createTableBatchReadSession(List<PartitionSpec> 
requiredPartitionSpecs) throws IOException {
+        MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) 
table.getCatalog();
 
+        readTimeout = mcCatalog.getReadTimeout();
+        connectTimeout = mcCatalog.getConnectTimeout();
+        retryTimes = mcCatalog.getRetryTimes();
+
+        TableReadSessionBuilder scanBuilder = new TableReadSessionBuilder();
+        return scanBuilder.identifier(TableIdentifier.of(table.getDbName(), 
table.getName()))
+                        .withSettings(mcCatalog.getSettings())
+                        .withSplitOptions(mcCatalog.getSplitOption())
+                        .requiredPartitionColumns(requiredPartitionColumns)
+                        .requiredDataColumns(orderedRequiredDataColumns)
+                        .withFilterPredicate(filterPredicate)
+                        .requiredPartitions(requiredPartitionSpecs)
+                        .withArrowOptions(
+                                ArrowOptions.newBuilder()
+                                        .withDatetimeUnit(TimestampUnit.MILLI)
+                                        .withTimestampUnit(TimestampUnit.NANO)
+                                        .build()
+                        ).buildBatchReadSession();
+    }
 
-        MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) 
table.getCatalog();
+    @Override
+    public boolean isBatchMode() {
+        if (table.getPartitionColumns().isEmpty()) {
+            return false;
+        }
 
-        try {
-            TableReadSessionBuilder scanBuilder = new 
TableReadSessionBuilder();
-            tableBatchReadSession =
-                    
scanBuilder.identifier(TableIdentifier.of(table.getDbName(), table.getName()))
-                            .withSettings(mcCatalog.getSettings())
-                            .withSplitOptions(mcCatalog.getSplitOption())
-                            .requiredPartitionColumns(requiredPartitionColumns)
-                            .requiredPartitions(requiredPartitionSpecs)
-                            .requiredDataColumns(orderedRequiredDataColumns)
-                            .withArrowOptions(
-                                    ArrowOptions.newBuilder()
-                                            
.withDatetimeUnit(TimestampUnit.MILLI)
-                                            
.withTimestampUnit(TimestampUnit.NANO)
-                                            .build()
-                            )
-                            .withFilterPredicate(filterPredicate)
-                            .buildBatchReadSession();
-        } catch (java.io.IOException e) {
-            throw new RuntimeException(e);
+        com.aliyun.odps.Table odpsTable = table.getOdpsTable();
+        if (desc.getSlots().isEmpty() || odpsTable.getFileNum() <= 0) {
+            return false;
         }
-        return true;
+
+        int numPartitions = 
ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode();
+        return numPartitions > 0
+                && selectedPartitions != SelectedPartitions.NOT_PRUNED
+                && selectedPartitions.selectedPartitions.size() >= 
numPartitions;
+    }
+
+    @Override
+    public int numApproximateSplits() {
+        return selectedPartitions.selectedPartitions.size();
+    }
+
+    @Override
+    public void startSplit() {
+        this.totalPartitionNum = selectedPartitions.totalPartitionNum;
+        this.selectedPartitionNum = 
selectedPartitions.selectedPartitions.size();
+
+        if (selectedPartitions.selectedPartitions.isEmpty()) {
+            //no need read any partition data.
+            return;
+        }
+
+        createRequiredColumns();
+        List<PartitionSpec> requiredPartitionSpecs = new ArrayList<>();
+        selectedPartitions.selectedPartitions.forEach(
+                (key, value) -> requiredPartitionSpecs.add(new 
PartitionSpec(key))
+        );
+
+
+        int batchNumPartitions = 
ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode();
+
+        Executor scheduleExecutor = 
Env.getCurrentEnv().getExtMetaCacheMgr().getScheduleExecutor();
+        AtomicReference<UserException> batchException = new 
AtomicReference<>(null);
+        AtomicInteger numFinishedPartitions = new AtomicInteger(0);
+
+        CompletableFuture.runAsync(() -> {
+            for (int beginIndex = 0; beginIndex < 
requiredPartitionSpecs.size(); beginIndex += batchNumPartitions) {
+                int endIndex = Math.min(beginIndex + batchNumPartitions, 
requiredPartitionSpecs.size());
+                if (batchException.get() != null || splitAssignment.isStop()) {
+                    break;
+                }
+                List<PartitionSpec> requiredBatchPartitionSpecs = 
requiredPartitionSpecs.subList(beginIndex, endIndex);
+                int curBatchSize = endIndex - beginIndex;
+
+                try {
+                    CompletableFuture.runAsync(() -> {
+                        try {
+                            TableBatchReadSession tableBatchReadSession =
+                                    
createTableBatchReadSession(requiredBatchPartitionSpecs);

Review Comment:
   Looks like in this new implementation, you create read session for each 
partition?
   Is `create read session` a heavy operation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to