hubgeter commented on code in PR #45148: URL: https://github.com/apache/doris/pull/45148#discussion_r1874356318
########## fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java: ########## @@ -181,32 +174,118 @@ boolean createTableBatchReadSession() throws UserException { orderedRequiredDataColumns.add(columnName); } } + } + /** + * For no partition table: request requiredPartitionSpecs is empty + * For partition table: if requiredPartitionSpecs is empty, get all partition data. + */ + TableBatchReadSession createTableBatchReadSession(List<PartitionSpec> requiredPartitionSpecs) throws IOException { + MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) table.getCatalog(); + readTimeout = mcCatalog.getReadTimeout(); + connectTimeout = mcCatalog.getConnectTimeout(); + retryTimes = mcCatalog.getRetryTimes(); + + TableReadSessionBuilder scanBuilder = new TableReadSessionBuilder(); + return scanBuilder.identifier(TableIdentifier.of(table.getDbName(), table.getName())) + .withSettings(mcCatalog.getSettings()) + .withSplitOptions(mcCatalog.getSplitOption()) + .requiredPartitionColumns(requiredPartitionColumns) + .requiredDataColumns(orderedRequiredDataColumns) + .withFilterPredicate(filterPredicate) + .requiredPartitions(requiredPartitionSpecs) + .withArrowOptions( + ArrowOptions.newBuilder() + .withDatetimeUnit(TimestampUnit.MILLI) + .withTimestampUnit(TimestampUnit.NANO) + .build() + ).buildBatchReadSession(); + } - MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) table.getCatalog(); + @Override + public boolean isBatchMode() { + if (table.getPartitionColumns().isEmpty()) { + return false; + } - try { - TableReadSessionBuilder scanBuilder = new TableReadSessionBuilder(); - tableBatchReadSession = - scanBuilder.identifier(TableIdentifier.of(table.getDbName(), table.getName())) - .withSettings(mcCatalog.getSettings()) - .withSplitOptions(mcCatalog.getSplitOption()) - .requiredPartitionColumns(requiredPartitionColumns) - .requiredPartitions(requiredPartitionSpecs) - .requiredDataColumns(orderedRequiredDataColumns) - .withArrowOptions( - ArrowOptions.newBuilder() - .withDatetimeUnit(TimestampUnit.MILLI) - .withTimestampUnit(TimestampUnit.NANO) - .build() - ) - .withFilterPredicate(filterPredicate) - .buildBatchReadSession(); - } catch (java.io.IOException e) { - throw new RuntimeException(e); + com.aliyun.odps.Table odpsTable = table.getOdpsTable(); + if (desc.getSlots().isEmpty() || odpsTable.getFileNum() <= 0) { + return false; } - return true; + + int numPartitions = ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode(); + return numPartitions > 0 + && selectedPartitions != SelectedPartitions.NOT_PRUNED + && selectedPartitions.selectedPartitions.size() >= numPartitions; + } + + @Override + public int numApproximateSplits() { + return selectedPartitions.selectedPartitions.size(); + } + + @Override + public void startSplit() { + this.totalPartitionNum = selectedPartitions.totalPartitionNum; + this.selectedPartitionNum = selectedPartitions.selectedPartitions.size(); + + if (selectedPartitions.selectedPartitions.isEmpty()) { + //no need read any partition data. + return; + } + + createRequiredColumns(); + List<PartitionSpec> requiredPartitionSpecs = new ArrayList<>(); + selectedPartitions.selectedPartitions.forEach( + (key, value) -> requiredPartitionSpecs.add(new PartitionSpec(key)) + ); + + + int batchNumPartitions = ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode(); + + Executor scheduleExecutor = Env.getCurrentEnv().getExtMetaCacheMgr().getScheduleExecutor(); + AtomicReference<UserException> batchException = new AtomicReference<>(null); + AtomicInteger numFinishedPartitions = new AtomicInteger(0); + + CompletableFuture.runAsync(() -> { + for (int beginIndex = 0; beginIndex < requiredPartitionSpecs.size(); beginIndex += batchNumPartitions) { + int endIndex = Math.min(beginIndex + batchNumPartitions, requiredPartitionSpecs.size()); + if (batchException.get() != null || splitAssignment.isStop()) { + break; + } + List<PartitionSpec> requiredBatchPartitionSpecs = requiredPartitionSpecs.subList(beginIndex, endIndex); + int curBatchSize = endIndex - beginIndex; + + try { + CompletableFuture.runAsync(() -> { + try { + TableBatchReadSession tableBatchReadSession = + createTableBatchReadSession(requiredBatchPartitionSpecs); Review Comment: create read session for a batch of partition . `createTableBatchReadSession` have network io , and increasing the number of partitions will cause network io to be very slow. I tested 1500 partitions and it took about 13 seconds. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org