This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 463718b8fb8 [opt](maxcompute) Optimize split generation for LIMIT
queries with partition equality predicates (#60895)
463718b8fb8 is described below
commit 463718b8fb8e08f9b562ed215d516baae9ad72f1
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Tue Mar 3 12:49:45 2026 +0800
[opt](maxcompute) Optimize split generation for LIMIT queries with
partition equality predicates (#60895)
### What problem does this PR solve?
When a MaxCompute query contains only partition equality predicates and
a LIMIT clause, use row_offset split strategy to read only the required
number of rows instead of generating splits for all data. This reduces
split count from potentially many to exactly one, improving query
latency
for common LIMIT patterns like `SELECT * FROM t WHERE pt='x' LIMIT N`.
Key changes:
- Add `checkOnlyPartitionEqualityPredicate()` to detect eligible queries
- Add `getSplitsWithLimitOptimization()` using SplitByRowOffset with
crossPartition=false, reading min(limit, totalRowCount) rows
- Add session variable `enable_mc_limit_split_optimization` (default
off)
- Add timing logs for split generation phases to aid performance
diagnosis
- Add unit tests for predicate check and limit optimization logic
- Add regression tests covering single/multi-partition tables, JOINs,
aggregations, subqueries, window functions, and edge cases
---
.../maxcompute/source/MaxComputeScanNode.java | 144 ++++++-
.../java/org/apache/doris/qe/SessionVariable.java | 13 +
.../maxcompute/source/MaxComputeScanNodeTest.java | 438 +++++++++++++++++++++
.../write/test_mc_limit_split_optimization.groovy | 266 +++++++++++++
4 files changed, 857 insertions(+), 4 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
index 5f73f5cab8d..898022b2dcf 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
@@ -54,6 +54,7 @@ import com.aliyun.odps.OdpsType;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.table.configuration.ArrowOptions;
import com.aliyun.odps.table.configuration.ArrowOptions.TimestampUnit;
+import com.aliyun.odps.table.configuration.SplitOptions;
import com.aliyun.odps.table.optimizer.predicate.Predicate;
import com.aliyun.odps.table.read.TableBatchReadSession;
import com.aliyun.odps.table.read.TableReadSessionBuilder;
@@ -100,6 +101,8 @@ public class MaxComputeScanNode extends FileQueryScanNode {
private int readTimeout;
private int retryTimes;
+ private boolean onlyPartitionEqualityPredicate = false;
+
@Setter
private SelectedPartitions selectedPartitions = null;
@@ -177,6 +180,12 @@ public class MaxComputeScanNode extends FileQueryScanNode {
*/
TableBatchReadSession createTableBatchReadSession(List<PartitionSpec>
requiredPartitionSpecs) throws IOException {
MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog)
table.getCatalog();
+ return createTableBatchReadSession(requiredPartitionSpecs,
mcCatalog.getSplitOption());
+ }
+
+ TableBatchReadSession createTableBatchReadSession(
+ List<PartitionSpec> requiredPartitionSpecs, SplitOptions
splitOptions) throws IOException {
+ MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog)
table.getCatalog();
readTimeout = mcCatalog.getReadTimeout();
connectTimeout = mcCatalog.getConnectTimeout();
@@ -186,7 +195,7 @@ public class MaxComputeScanNode extends FileQueryScanNode {
return scanBuilder.identifier(table.getTableIdentifier())
.withSettings(mcCatalog.getSettings())
- .withSplitOptions(mcCatalog.getSplitOption())
+ .withSplitOptions(splitOptions)
.requiredPartitionColumns(requiredPartitionColumns)
.requiredDataColumns(orderedRequiredDataColumns)
.withFilterPredicate(filterPredicate)
@@ -315,6 +324,51 @@ public class MaxComputeScanNode extends FileQueryScanNode {
}
this.filterPredicate = filterPredicate;
}
+
+ this.onlyPartitionEqualityPredicate =
checkOnlyPartitionEqualityPredicate();
+ }
+
+ private boolean checkOnlyPartitionEqualityPredicate() {
+ if (conjuncts.isEmpty()) {
+ return true;
+ }
+ Set<String> partitionColumns =
+
table.getPartitionColumns().stream().map(Column::getName).collect(Collectors.toSet());
+ for (Expr expr : conjuncts) {
+ if (expr instanceof BinaryPredicate) {
+ BinaryPredicate bp = (BinaryPredicate) expr;
+ if (bp.getOp() != BinaryPredicate.Operator.EQ) {
+ return false;
+ }
+ if (!(bp.getChild(0) instanceof SlotRef) || !(bp.getChild(1)
instanceof LiteralExpr)) {
+ return false;
+ }
+ String colName = ((SlotRef) bp.getChild(0)).getColumnName();
+ if (!partitionColumns.contains(colName)) {
+ return false;
+ }
+ } else if (expr instanceof InPredicate) {
+ InPredicate inPredicate = (InPredicate) expr;
+ if (inPredicate.isNotIn()) {
+ return false;
+ }
+ if (!(inPredicate.getChild(0) instanceof SlotRef)) {
+ return false;
+ }
+ String colName = ((SlotRef)
inPredicate.getChild(0)).getColumnName();
+ if (!partitionColumns.contains(colName)) {
+ return false;
+ }
+ for (int i = 1; i < inPredicate.getChildren().size(); i++) {
+ if (!(inPredicate.getChild(i) instanceof LiteralExpr)) {
+ return false;
+ }
+ }
+ } else {
+ return false;
+ }
+ }
+ return true;
}
private Predicate convertExprToOdpsPredicate(Expr expr) throws
AnalysisException {
@@ -576,14 +630,23 @@ public class MaxComputeScanNode extends FileQueryScanNode
{
private List<Split> getSplitByTableSession(TableBatchReadSession
tableBatchReadSession) throws IOException {
List<Split> result = new ArrayList<>();
+
+ long t0 = System.currentTimeMillis();
String scanSessionSerialize = serializeSession(tableBatchReadSession);
+ long t1 = System.currentTimeMillis();
+ LOG.info("MaxComputeScanNode getSplitByTableSession: serializeSession
cost {} ms, "
+ + "serialized size: {} bytes", t1 - t0,
scanSessionSerialize.length());
+
InputSplitAssigner assigner =
tableBatchReadSession.getInputSplitAssigner();
+ long t2 = System.currentTimeMillis();
+ LOG.info("MaxComputeScanNode getSplitByTableSession:
getInputSplitAssigner cost {} ms", t2 - t1);
+
long modificationTime =
table.getOdpsTable().getLastDataModifiedTime().getTime();
MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog)
table.getCatalog();
if
(mcCatalog.getSplitStrategy().equals(MCProperties.SPLIT_BY_BYTE_SIZE_STRATEGY))
{
-
+ long t3 = System.currentTimeMillis();
for (com.aliyun.odps.table.read.split.InputSplit split :
assigner.getAllSplits()) {
MaxComputeSplit maxComputeSplit =
new MaxComputeSplit(BYTE_SIZE_PATH,
@@ -599,7 +662,10 @@ public class MaxComputeScanNode extends FileQueryScanNode {
result.add(maxComputeSplit);
}
+ LOG.info("MaxComputeScanNode getSplitByTableSession: byte_size
getAllSplits+build cost {} ms, "
+ + "splits size: {}", System.currentTimeMillis() - t3,
result.size());
} else {
+ long t3 = System.currentTimeMillis();
long totalRowCount = assigner.getTotalRowCount();
long recordsPerSplit = mcCatalog.getSplitRowCount();
@@ -619,17 +685,27 @@ public class MaxComputeScanNode extends FileQueryScanNode
{
result.add(maxComputeSplit);
}
+ LOG.info("MaxComputeScanNode getSplitByTableSession: row_offset
getSplitByRowOffset+build cost {} ms, "
+ + "splits size: {}, totalRowCount: {}",
System.currentTimeMillis() - t3, result.size(),
+ totalRowCount);
}
+
return result;
}
@Override
public List<Split> getSplits(int numBackends) throws UserException {
+ long startTime = System.currentTimeMillis();
List<Split> result = new ArrayList<>();
com.aliyun.odps.Table odpsTable = table.getOdpsTable();
+ long getOdpsTableTime = System.currentTimeMillis();
+ LOG.info("MaxComputeScanNode getSplits: getOdpsTable cost {} ms",
getOdpsTableTime - startTime);
+
if (desc.getSlots().isEmpty() || odpsTable.getFileNum() <= 0) {
return result;
}
+ long getFileNumTime = System.currentTimeMillis();
+ LOG.info("MaxComputeScanNode getSplits: getFileNum cost {} ms",
getFileNumTime - getOdpsTableTime);
createRequiredColumns();
@@ -649,11 +725,71 @@ public class MaxComputeScanNode extends FileQueryScanNode
{
}
try {
- TableBatchReadSession tableBatchReadSession =
createTableBatchReadSession(requiredPartitionSpecs);
- result = getSplitByTableSession(tableBatchReadSession);
+ long beforeSession = System.currentTimeMillis();
+ if (sessionVariable.enableMcLimitSplitOptimization
+ && onlyPartitionEqualityPredicate && hasLimit()) {
+ result =
getSplitsWithLimitOptimization(requiredPartitionSpecs);
+ } else {
+ TableBatchReadSession tableBatchReadSession =
createTableBatchReadSession(requiredPartitionSpecs);
+ long afterSession = System.currentTimeMillis();
+ LOG.info("MaxComputeScanNode getSplits:
createTableBatchReadSession cost {} ms, "
+ + "partitionSpecs size: {}", afterSession -
beforeSession, requiredPartitionSpecs.size());
+
+ result = getSplitByTableSession(tableBatchReadSession);
+ long afterSplit = System.currentTimeMillis();
+ LOG.info("MaxComputeScanNode getSplits: getSplitByTableSession
cost {} ms, "
+ + "splits size: {}", afterSplit - afterSession,
result.size());
+ }
} catch (IOException e) {
throw new RuntimeException(e);
}
+ LOG.info("MaxComputeScanNode getSplits: total cost {} ms",
System.currentTimeMillis() - startTime);
+ return result;
+ }
+
+ private List<Split> getSplitsWithLimitOptimization(
+ List<PartitionSpec> requiredPartitionSpecs) throws IOException {
+ long startTime = System.currentTimeMillis();
+
+ SplitOptions rowOffsetOptions = SplitOptions.newBuilder()
+ .SplitByRowOffset()
+ .withCrossPartition(false)
+ .build();
+
+ TableBatchReadSession tableBatchReadSession =
+ createTableBatchReadSession(requiredPartitionSpecs,
rowOffsetOptions);
+ long afterSession = System.currentTimeMillis();
+ LOG.info("MaxComputeScanNode getSplitsWithLimitOptimization: "
+ + "createTableBatchReadSession cost {} ms", afterSession -
startTime);
+
+ String scanSessionSerialize = serializeSession(tableBatchReadSession);
+ InputSplitAssigner assigner =
tableBatchReadSession.getInputSplitAssigner();
+ long totalRowCount = assigner.getTotalRowCount();
+
+ LOG.info("MaxComputeScanNode getSplitsWithLimitOptimization: "
+ + "totalRowCount={}, limit={}", totalRowCount, getLimit());
+
+ List<Split> result = new ArrayList<>();
+ if (totalRowCount <= 0) {
+ return result;
+ }
+
+ long rowsToRead = Math.min(getLimit(), totalRowCount);
+ long modificationTime =
table.getOdpsTable().getLastDataModifiedTime().getTime();
+ com.aliyun.odps.table.read.split.InputSplit split =
+ assigner.getSplitByRowOffset(0, rowsToRead);
+
+ MaxComputeSplit maxComputeSplit = new MaxComputeSplit(
+ ROW_OFFSET_PATH, 0, rowsToRead, totalRowCount,
+ modificationTime, null, Collections.emptyList());
+ maxComputeSplit.scanSerialize = scanSessionSerialize;
+ maxComputeSplit.splitType = SplitType.ROW_OFFSET;
+ maxComputeSplit.sessionId = split.getSessionId();
+ result.add(maxComputeSplit);
+
+ LOG.info("MaxComputeScanNode getSplitsWithLimitOptimization: "
+ + "total cost {} ms, 1 split with {} rows",
+ System.currentTimeMillis() - startTime, rowsToRead);
return result;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 93286a8a330..362b69533d2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -2787,6 +2787,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String IGNORE_RUNTIME_FILTER_IDS =
"ignore_runtime_filter_ids";
public static final String ENABLE_EXTERNAL_TABLE_BATCH_MODE =
"enable_external_table_batch_mode";
+
+ public static final String ENABLE_MC_LIMIT_SPLIT_OPTIMIZATION =
"enable_mc_limit_split_optimization";
@VariableMgr.VarAttr(
name = ENABLE_EXTERNAL_TABLE_BATCH_MODE,
fuzzy = true,
@@ -2794,6 +2796,17 @@ public class SessionVariable implements Serializable,
Writable {
needForward = true)
public boolean enableExternalTableBatchMode = true;
+ @VariableMgr.VarAttr(
+ name = ENABLE_MC_LIMIT_SPLIT_OPTIMIZATION,
+ fuzzy = true,
+ description = {"开启 MaxCompute 表 LIMIT 查询的 split 优化。当查询仅包含分区等值条件且带有
LIMIT 时,"
+ + "使用 row_offset 策略减少 split 数量以加速查询。",
+ "Enable split optimization for LIMIT queries on MaxCompute
tables. "
+ + "When the query contains only partition equality
predicates with LIMIT, "
+ + "use row_offset strategy to reduce split count for
faster query execution."},
+ needForward = true)
+ public boolean enableMcLimitSplitOptimization = false;
+
@VariableMgr.VarAttr(name = SKEW_REWRITE_AGG_BUCKET_NUM, needForward =
true,
description = {"bucketNum 参数控制 count(distinct) 倾斜优化的数据分布。决定不同值在
worker 间的分配方式,"
+ "值越大越能处理极端倾斜但增加 shuffle 开销,值越小网络开销越低但可能无法完全解决倾斜。",
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNodeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNodeTest.java
new file mode 100644
index 00000000000..b00adb1a2e6
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNodeTest.java
@@ -0,0 +1,438 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.maxcompute.source;
+
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.InPredicate;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.StringLiteral;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
+import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable;
+import org.apache.doris.datasource.maxcompute.source.MaxComputeSplit.SplitType;
+import
org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
+import org.apache.doris.planner.PlanNode;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.spi.Split;
+
+import com.aliyun.odps.table.DataFormat;
+import com.aliyun.odps.table.DataSchema;
+import com.aliyun.odps.table.SessionStatus;
+import com.aliyun.odps.table.TableIdentifier;
+import com.aliyun.odps.table.read.TableBatchReadSession;
+import com.aliyun.odps.table.read.split.InputSplitAssigner;
+import com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+
+@RunWith(MockitoJUnitRunner.class)
+public class MaxComputeScanNodeTest {
+
+ @Mock
+ private MaxComputeExternalTable table;
+
+ @Mock
+ private MaxComputeExternalCatalog catalog;
+
+ @Mock
+ private com.aliyun.odps.Table odpsTable;
+
+ private SessionVariable sv;
+ private TupleDescriptor desc;
+ private MaxComputeScanNode node;
+
+ private List<Column> partitionColumns;
+
+ @Before
+ public void setUp() {
+ partitionColumns = Arrays.asList(
+ new Column("dt", PrimitiveType.VARCHAR),
+ new Column("hr", PrimitiveType.VARCHAR)
+ );
+ Mockito.when(table.getPartitionColumns()).thenReturn(partitionColumns);
+ Mockito.when(table.getCatalog()).thenReturn(catalog);
+ Mockito.when(table.getOdpsTable()).thenReturn(odpsTable);
+
+ desc = Mockito.mock(TupleDescriptor.class);
+ Mockito.when(desc.getTable()).thenReturn(table);
+ Mockito.when(desc.getId()).thenReturn(new TupleId(0));
+ Mockito.when(desc.getSlots()).thenReturn(new ArrayList<>());
+
+ sv = new SessionVariable();
+ node = new MaxComputeScanNode(new PlanNodeId(0), desc,
+ SelectedPartitions.NOT_PRUNED, false, sv);
+ }
+
+ // ==================== Reflection Helpers ====================
+
+ private void setConjuncts(PlanNode target, List<Expr> conjuncts) throws
Exception {
+ Field f = PlanNode.class.getDeclaredField("conjuncts");
+ f.setAccessible(true);
+ f.set(target, conjuncts);
+ }
+
+ private void setLimit(PlanNode target, long limit) throws Exception {
+ Field f = PlanNode.class.getDeclaredField("limit");
+ f.setAccessible(true);
+ f.setLong(target, limit);
+ }
+
+ private void setOnlyPartitionEqualityPredicate(MaxComputeScanNode target,
boolean value) throws Exception {
+ Field f =
MaxComputeScanNode.class.getDeclaredField("onlyPartitionEqualityPredicate");
+ f.setAccessible(true);
+ f.setBoolean(target, value);
+ }
+
+ private boolean
invokeCheckOnlyPartitionEqualityPredicate(MaxComputeScanNode target) throws
Exception {
+ Method m =
MaxComputeScanNode.class.getDeclaredMethod("checkOnlyPartitionEqualityPredicate");
+ m.setAccessible(true);
+ return (boolean) m.invoke(target);
+ }
+
+ // ==================== Group 1: checkOnlyPartitionEqualityPredicate
====================
+
+ @Test
+ public void testCheckOnlyPartEq_emptyConjuncts() throws Exception {
+ setConjuncts(node, new ArrayList<>());
+ Assert.assertTrue(invokeCheckOnlyPartitionEqualityPredicate(node));
+ }
+
+ @Test
+ public void testCheckOnlyPartEq_singlePartitionEquality() throws Exception
{
+ SlotRef dtSlot = new SlotRef(null, "dt");
+ StringLiteral val = new StringLiteral("2026-02-26");
+ BinaryPredicate eq = new BinaryPredicate(BinaryPredicate.Operator.EQ,
dtSlot, val);
+ setConjuncts(node, Lists.newArrayList(eq));
+ Assert.assertTrue(invokeCheckOnlyPartitionEqualityPredicate(node));
+ }
+
+ @Test
+ public void testCheckOnlyPartEq_multiPartitionEquality() throws Exception {
+ SlotRef dtSlot = new SlotRef(null, "dt");
+ SlotRef hrSlot = new SlotRef(null, "hr");
+ BinaryPredicate eq1 = new BinaryPredicate(BinaryPredicate.Operator.EQ,
dtSlot, new StringLiteral("x"));
+ BinaryPredicate eq2 = new BinaryPredicate(BinaryPredicate.Operator.EQ,
hrSlot, new StringLiteral("10"));
+ setConjuncts(node, Lists.newArrayList(eq1, eq2));
+ Assert.assertTrue(invokeCheckOnlyPartitionEqualityPredicate(node));
+ }
+
+ @Test
+ public void testCheckOnlyPartEq_nonPartitionColumn() throws Exception {
+ SlotRef statusSlot = new SlotRef(null, "status");
+ BinaryPredicate eq = new BinaryPredicate(BinaryPredicate.Operator.EQ,
statusSlot, new StringLiteral("active"));
+ setConjuncts(node, Lists.newArrayList(eq));
+ Assert.assertFalse(invokeCheckOnlyPartitionEqualityPredicate(node));
+ }
+
+ @Test
+ public void testCheckOnlyPartEq_nonEqOperator() throws Exception {
+ SlotRef dtSlot = new SlotRef(null, "dt");
+ BinaryPredicate gt = new BinaryPredicate(BinaryPredicate.Operator.GT,
dtSlot, new StringLiteral("2026-01-01"));
+ setConjuncts(node, Lists.newArrayList(gt));
+ Assert.assertFalse(invokeCheckOnlyPartitionEqualityPredicate(node));
+ }
+
+ @Test
+ public void testCheckOnlyPartEq_inPredicateOnPartitionColumn() throws
Exception {
+ SlotRef dtSlot = new SlotRef(null, "dt");
+ List<Expr> inList = Lists.newArrayList(new StringLiteral("a"), new
StringLiteral("b"));
+ InPredicate inPred = new InPredicate(dtSlot, inList, false);
+ setConjuncts(node, Lists.newArrayList(inPred));
+ Assert.assertTrue(invokeCheckOnlyPartitionEqualityPredicate(node));
+ }
+
+ @Test
+ public void testCheckOnlyPartEq_notInPredicate() throws Exception {
+ SlotRef dtSlot = new SlotRef(null, "dt");
+ List<Expr> inList = Lists.newArrayList(new StringLiteral("a"), new
StringLiteral("b"));
+ InPredicate notInPred = new InPredicate(dtSlot, inList, true);
+ setConjuncts(node, Lists.newArrayList(notInPred));
+ Assert.assertFalse(invokeCheckOnlyPartitionEqualityPredicate(node));
+ }
+
+ @Test
+ public void testCheckOnlyPartEq_inPredicateOnNonPartitionColumn() throws
Exception {
+ SlotRef statusSlot = new SlotRef(null, "status");
+ List<Expr> inList = Lists.newArrayList(new StringLiteral("a"), new
StringLiteral("b"));
+ InPredicate inPred = new InPredicate(statusSlot, inList, false);
+ setConjuncts(node, Lists.newArrayList(inPred));
+ Assert.assertFalse(invokeCheckOnlyPartitionEqualityPredicate(node));
+ }
+
+ @Test
+ public void testCheckOnlyPartEq_inPredicateWithNonLiteralValue() throws
Exception {
+ SlotRef dtSlot = new SlotRef(null, "dt");
+ SlotRef hrSlot = new SlotRef(null, "hr");
+ List<Expr> inList = Lists.newArrayList(hrSlot);
+ InPredicate inPred = new InPredicate(dtSlot, inList, false);
+ setConjuncts(node, Lists.newArrayList(inPred));
+ Assert.assertFalse(invokeCheckOnlyPartitionEqualityPredicate(node));
+ }
+
+ @Test
+ public void testCheckOnlyPartEq_mixedEqAndInOnPartitionColumns() throws
Exception {
+ SlotRef dtSlot = new SlotRef(null, "dt");
+ BinaryPredicate eq = new BinaryPredicate(BinaryPredicate.Operator.EQ,
dtSlot, new StringLiteral("2026-01-01"));
+
+ SlotRef hrSlot = new SlotRef(null, "hr");
+ List<Expr> inList = Lists.newArrayList(new StringLiteral("10"), new
StringLiteral("11"));
+ InPredicate inPred = new InPredicate(hrSlot, inList, false);
+
+ setConjuncts(node, Lists.newArrayList(eq, inPred));
+ Assert.assertTrue(invokeCheckOnlyPartitionEqualityPredicate(node));
+ }
+
+ @Test
+ public void testCheckOnlyPartEq_leftSideNotSlotRef() throws Exception {
+ StringLiteral left = new StringLiteral("x");
+ StringLiteral right = new StringLiteral("x");
+ BinaryPredicate eq = new BinaryPredicate(BinaryPredicate.Operator.EQ,
left, right);
+ setConjuncts(node, Lists.newArrayList(eq));
+ Assert.assertFalse(invokeCheckOnlyPartitionEqualityPredicate(node));
+ }
+
+ @Test
+ public void testCheckOnlyPartEq_rightSideNotLiteral() throws Exception {
+ SlotRef dtSlot = new SlotRef(null, "dt");
+ SlotRef hrSlot = new SlotRef(null, "hr");
+ BinaryPredicate eq = new BinaryPredicate(BinaryPredicate.Operator.EQ,
dtSlot, hrSlot);
+ setConjuncts(node, Lists.newArrayList(eq));
+ Assert.assertFalse(invokeCheckOnlyPartitionEqualityPredicate(node));
+ }
+
+ // ==================== Serializable Stub for TableBatchReadSession
====================
+
+ private static class StubTableBatchReadSession implements
TableBatchReadSession {
+ private static final long serialVersionUID = 1L;
+ private transient InputSplitAssigner assigner;
+
+ StubTableBatchReadSession(InputSplitAssigner assigner) {
+ this.assigner = assigner;
+ }
+
+ @Override
+ public InputSplitAssigner getInputSplitAssigner() throws IOException {
+ return assigner;
+ }
+
+ @Override
+ public DataSchema readSchema() {
+ return null;
+ }
+
+ @Override
+ public boolean supportsDataFormat(DataFormat dataFormat) {
+ return false;
+ }
+
+ @Override
+ public String getId() {
+ return "stub-session";
+ }
+
+ @Override
+ public TableIdentifier getTableIdentifier() {
+ return null;
+ }
+
+ @Override
+ public SessionStatus getStatus() {
+ return SessionStatus.NORMAL;
+ }
+
+ @Override
+ public String toJson() {
+ return "{}";
+ }
+ }
+
+ // ==================== Mock Session Helper ====================
+
+ private MaxComputeScanNode createSpyNodeWithMockSession(long
totalRowCount) throws Exception {
+ MaxComputeScanNode spyNode = Mockito.spy(node);
+
+ InputSplitAssigner mockAssigner =
Mockito.mock(InputSplitAssigner.class);
+ com.aliyun.odps.table.read.split.InputSplit mockInputSplit =
+
Mockito.mock(com.aliyun.odps.table.read.split.InputSplit.class);
+
+
Mockito.when(mockAssigner.getTotalRowCount()).thenReturn(totalRowCount);
+ Mockito.when(mockAssigner.getSplitByRowOffset(Mockito.anyLong(),
Mockito.anyLong()))
+ .thenReturn(mockInputSplit);
+
Mockito.when(mockInputSplit.getSessionId()).thenReturn("test-session-id");
+
+ StubTableBatchReadSession stubSession = new
StubTableBatchReadSession(mockAssigner);
+
+ Mockito.doReturn(stubSession).when(spyNode)
+ .createTableBatchReadSession(Mockito.anyList(), Mockito.any(
+
com.aliyun.odps.table.configuration.SplitOptions.class));
+ Mockito.doReturn(stubSession).when(spyNode)
+ .createTableBatchReadSession(Mockito.anyList());
+
+ Mockito.when(odpsTable.getLastDataModifiedTime()).thenReturn(new
Date(1000L));
+
+ return spyNode;
+ }
+
+ // ==================== Group 2: getSplitsWithLimitOptimization
====================
+
+ private List<Split> invokeGetSplitsWithLimitOptimization(
+ MaxComputeScanNode target) throws Exception {
+ Method m = MaxComputeScanNode.class.getDeclaredMethod(
+ "getSplitsWithLimitOptimization", List.class);
+ m.setAccessible(true);
+ @SuppressWarnings("unchecked")
+ List<Split> result = (List<Split>) m.invoke(target,
Collections.emptyList());
+ return result;
+ }
+
+ @Test
+ public void testLimitOpt_limitLessThanTotal() throws Exception {
+ MaxComputeScanNode spyNode = createSpyNodeWithMockSession(10000L);
+ setLimit(spyNode, 100L);
+
+ List<Split> result = invokeGetSplitsWithLimitOptimization(spyNode);
+
+ Assert.assertEquals(1, result.size());
+ MaxComputeSplit split = (MaxComputeSplit) result.get(0);
+ Assert.assertEquals(SplitType.ROW_OFFSET, split.splitType);
+ Assert.assertEquals(100L, split.getLength());
+ }
+
+ @Test
+ public void testLimitOpt_limitGreaterThanTotal() throws Exception {
+ MaxComputeScanNode spyNode = createSpyNodeWithMockSession(200L);
+ setLimit(spyNode, 50000L);
+
+ List<Split> result = invokeGetSplitsWithLimitOptimization(spyNode);
+
+ Assert.assertEquals(1, result.size());
+ MaxComputeSplit split = (MaxComputeSplit) result.get(0);
+ Assert.assertEquals(SplitType.ROW_OFFSET, split.splitType);
+ Assert.assertEquals(200L, split.getLength());
+ }
+
+ @Test
+ public void testLimitOpt_totalRowCountZero() throws Exception {
+ MaxComputeScanNode spyNode = createSpyNodeWithMockSession(0L);
+ setLimit(spyNode, 100L);
+
+ List<Split> result = invokeGetSplitsWithLimitOptimization(spyNode);
+
+ Assert.assertTrue(result.isEmpty());
+ }
+
+ // ==================== Group 3: getSplits gating conditions
====================
+
+ private MaxComputeScanNode createSpyNodeForGetSplits(long totalRowCount)
throws Exception {
+ // Need non-empty slots so getSplits doesn't return early
+ SlotDescriptor mockSlotDesc = Mockito.mock(SlotDescriptor.class);
+ Column dataCol = new Column("value", PrimitiveType.VARCHAR);
+ Mockito.when(mockSlotDesc.getColumn()).thenReturn(dataCol);
+
Mockito.when(desc.getSlots()).thenReturn(Lists.newArrayList(mockSlotDesc));
+
+ // Need fileNum > 0
+ Mockito.when(odpsTable.getFileNum()).thenReturn(10L);
+
+ // For normal path: use row_count strategy
+ Mockito.when(catalog.getSplitStrategy()).thenReturn("row_count");
+ Mockito.when(catalog.getSplitRowCount()).thenReturn(totalRowCount);
+
+ // Need table.getColumns() for createRequiredColumns()
+ List<Column> allColumns = Lists.newArrayList(
+ new Column("dt", PrimitiveType.VARCHAR),
+ new Column("hr", PrimitiveType.VARCHAR),
+ new Column("value", PrimitiveType.VARCHAR)
+ );
+ Mockito.when(table.getColumns()).thenReturn(allColumns);
+
+ return createSpyNodeWithMockSession(totalRowCount);
+ }
+
+ @Test
+ public void testGetSplits_allConditionsMet_optimizationPath() throws
Exception {
+ MaxComputeScanNode spyNode = createSpyNodeForGetSplits(10000L);
+ sv.enableMcLimitSplitOptimization = true;
+ setOnlyPartitionEqualityPredicate(spyNode, true);
+ setLimit(spyNode, 100L);
+
+ List<Split> result = spyNode.getSplits(1);
+
+ Assert.assertEquals(1, result.size());
+ MaxComputeSplit split = (MaxComputeSplit) result.get(0);
+ Assert.assertEquals(SplitType.ROW_OFFSET, split.splitType);
+ Assert.assertEquals(100L, split.getLength());
+ }
+
+ @Test
+ public void testGetSplits_optimizationDisabled_normalPath() throws
Exception {
+ MaxComputeScanNode spyNode = createSpyNodeForGetSplits(1000L);
+ sv.enableMcLimitSplitOptimization = false;
+ setOnlyPartitionEqualityPredicate(spyNode, true);
+ setLimit(spyNode, 100L);
+
+ List<Split> result = spyNode.getSplits(1);
+
+ // Normal path with row_count strategy: totalRowCount=1000,
splitRowCount=1000 → 1 split
+ // but the split length equals splitRowCount, not limit
+ Assert.assertFalse(result.isEmpty());
+ }
+
+ @Test
+ public void testGetSplits_nonPartitionPredicate_normalPath() throws
Exception {
+ MaxComputeScanNode spyNode = createSpyNodeForGetSplits(1000L);
+ sv.enableMcLimitSplitOptimization = true;
+ setOnlyPartitionEqualityPredicate(spyNode, false);
+ setLimit(spyNode, 100L);
+
+ List<Split> result = spyNode.getSplits(1);
+
+ Assert.assertFalse(result.isEmpty());
+ }
+
+ @Test
+ public void testGetSplits_noLimit_normalPath() throws Exception {
+ MaxComputeScanNode spyNode = createSpyNodeForGetSplits(1000L);
+ sv.enableMcLimitSplitOptimization = true;
+ setOnlyPartitionEqualityPredicate(spyNode, true);
+ // limit defaults to -1 (no limit), don't set it
+
+ List<Split> result = spyNode.getSplits(1);
+
+ Assert.assertFalse(result.isEmpty());
+ }
+}
diff --git
a/regression-test/suites/external_table_p2/maxcompute/write/test_mc_limit_split_optimization.groovy
b/regression-test/suites/external_table_p2/maxcompute/write/test_mc_limit_split_optimization.groovy
new file mode 100644
index 00000000000..e2e4972e36a
--- /dev/null
+++
b/regression-test/suites/external_table_p2/maxcompute/write/test_mc_limit_split_optimization.groovy
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_mc_limit_split_optimization",
"p2,external,maxcompute,external_remote,external_remote_maxcompute") {
+ String enabled = context.config.otherConfigs.get("enableMaxComputeTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable MaxCompute test.")
+ return
+ }
+
+ String ak = context.config.otherConfigs.get("ak")
+ String sk = context.config.otherConfigs.get("sk")
+ String mc_catalog_name = "test_mc_limit_split_opt"
+ String defaultProject = "mc_doris_test_write"
+ // String defaultProject = "doris_test_schema"
+
+ sql """drop catalog if exists ${mc_catalog_name}"""
+ sql """
+ CREATE CATALOG IF NOT EXISTS ${mc_catalog_name} PROPERTIES (
+ "type" = "max_compute",
+ "mc.default.project" = "${defaultProject}",
+ "mc.access_key" = "${ak}",
+ "mc.secret_key" = "${sk}",
+ "mc.endpoint" =
"http://service.cn-beijing-vpc.maxcompute.aliyun-inc.com/api",
+ "mc.quota" = "pay-as-you-go",
+ "mc.enable.namespace.schema" = "true"
+ );
+ """
+
+ sql """switch ${mc_catalog_name}"""
+
+ def uuid = UUID.randomUUID().toString().replace("-", "").substring(0, 8)
+ String db = "mc_limit_opt_${uuid}"
+
+ sql """drop database if exists ${db}"""
+ sql """create database ${db}"""
+ sql """use ${db}"""
+
+ try {
+ // ==================== Step 1: Create partition table and insert data
====================
+ String tb = "many_part_${uuid}"
+ sql """DROP TABLE IF EXISTS ${tb}"""
+ sql """
+ CREATE TABLE ${tb} (
+ id INT,
+ name STRING,
+ pt STRING
+ )
+ PARTITION BY (pt)()
+ """
+
+ // Generate 50 partitions, each with 20 rows = 1000 rows total
+ sql """
+ INSERT INTO ${tb}
+ SELECT
+ c1 * 50 + c2 + 1 AS id,
+ CONCAT('name_', CAST(c1 * 50 + c2 + 1 AS STRING)) AS name,
+ CAST(c2 + 1 AS STRING) AS pt
+ FROM (SELECT 1) t
+ LATERAL VIEW EXPLODE_NUMBERS(20) t1 AS c1
+ LATERAL VIEW EXPLODE_NUMBERS(50) t2 AS c2
+ """
+
+ // Verify total row count
+ def totalCount = sql """SELECT count(*) FROM ${tb}"""
+ logger.info("Total rows inserted: ${totalCount}")
+ assert totalCount[0][0] == 1000
+
+ // ==================== Step 2: Multi-partition column table
====================
+ String tb2 = "multi_part_${uuid}"
+ sql """DROP TABLE IF EXISTS ${tb2}"""
+ sql """
+ CREATE TABLE ${tb2} (
+ id INT,
+ val STRING,
+ dt STRING,
+ region STRING
+ )
+ PARTITION BY (dt, region)()
+ """
+
+ // 5 dt x 4 region = 20 partitions, 5 rows each = 100 rows
+ sql """
+ INSERT INTO ${tb2}
+ SELECT
+ c1 * 20 + c2 * 4 + c3 + 1 AS id,
+ CONCAT('val_', CAST(c1 * 20 + c2 * 4 + c3 + 1 AS STRING)) AS val,
+ CONCAT('2026-01-0', CAST(c2 + 1 AS STRING)) AS dt,
+ CONCAT('r', CAST(c3 + 1 AS STRING)) AS region
+ FROM (SELECT 1) t
+ LATERAL VIEW EXPLODE_NUMBERS(5) t1 AS c1
+ LATERAL VIEW EXPLODE_NUMBERS(5) t2 AS c2
+ LATERAL VIEW EXPLODE_NUMBERS(4) t3 AS c3
+ """
+
+ def totalCount2 = sql """SELECT count(*) FROM ${tb2}"""
+ logger.info("Multi-part total rows: ${totalCount2}")
+ assert totalCount2[0][0] == 100
+
+ // ==================== Step 3: Test single-partition table
====================
+ // Helper: run query with opt ON and OFF, assert results are equal
+ def compareWithAndWithoutOpt = { String queryLabel, String query ->
+ sql """set enable_mc_limit_split_optimization = true"""
+ def resultOn = sql "${query}"
+ sql """set enable_mc_limit_split_optimization = false"""
+ def resultOff = sql "${query}"
+ logger.info("${queryLabel}: opt_on=${resultOn.size()} rows,
opt_off=${resultOff.size()} rows")
+ assert resultOn == resultOff : "${queryLabel}: results differ
between opt ON and OFF"
+ }
+
+ // --- Case 1: partition equality + LIMIT (optimization should kick
in) ---
+ compareWithAndWithoutOpt("case1_part_eq_limit",
+ "SELECT * FROM ${tb} WHERE pt = '1' ORDER BY id LIMIT 5")
+
+ // --- Case 2: partition equality + small LIMIT ---
+ compareWithAndWithoutOpt("case2_part_eq_limit1",
+ "SELECT * FROM ${tb} WHERE pt = '10' ORDER BY id LIMIT 1")
+
+ // --- Case 3: partition equality + LIMIT larger than partition data
---
+ compareWithAndWithoutOpt("case3_part_eq_limit_large",
+ "SELECT * FROM ${tb} WHERE pt = '5' ORDER BY id LIMIT 1000")
+
+ // --- Case 4: no predicate + LIMIT (empty conjuncts → optimization
eligible) ---
+ compareWithAndWithoutOpt("case4_no_pred_limit",
+ "SELECT * FROM ${tb} ORDER BY id LIMIT 10")
+
+ // --- Case 5: non-partition predicate + LIMIT (optimization should
NOT kick in) ---
+ compareWithAndWithoutOpt("case5_non_part_pred_limit",
+ "SELECT * FROM ${tb} WHERE name = 'name_1' ORDER BY id LIMIT 5")
+
+ // --- Case 6: partition equality + non-partition predicate + LIMIT
(mixed → no opt) ---
+ compareWithAndWithoutOpt("case6_mixed_pred_limit",
+ "SELECT * FROM ${tb} WHERE pt = '1' AND name = 'name_1' ORDER BY
id LIMIT 5")
+
+ // --- Case 7: partition range predicate + LIMIT (non-equality → no
opt) ---
+ compareWithAndWithoutOpt("case7_range_pred_limit",
+ "SELECT * FROM ${tb} WHERE pt > '3' ORDER BY id LIMIT 10")
+
+ // --- Case 8: partition IN predicate + LIMIT (optimization should
kick in) ---
+ compareWithAndWithoutOpt("case8_in_pred_limit",
+ "SELECT * FROM ${tb} WHERE pt IN ('1', '2', '3') ORDER BY id LIMIT
10")
+
+ // --- Case 8b: partition NOT IN predicate + LIMIT (no opt) ---
+ compareWithAndWithoutOpt("case8b_not_in_pred_limit",
+ "SELECT * FROM ${tb} WHERE pt NOT IN ('1', '2') ORDER BY id LIMIT
10")
+
+ // --- Case 8c: partition IN + partition EQ mixed + LIMIT (opt
eligible) ---
+ compareWithAndWithoutOpt("case8c_in_and_eq_limit",
+ "SELECT * FROM ${tb2} WHERE dt IN ('2026-01-01', '2026-01-02') AND
region = 'r1' ORDER BY id LIMIT 5")
+
+ // --- Case 9: partition equality + no LIMIT (no opt) ---
+ compareWithAndWithoutOpt("case9_part_eq_no_limit",
+ "SELECT * FROM ${tb} WHERE pt = '1' ORDER BY id")
+
+ // --- Case 10: count(*) with partition equality + LIMIT ---
+ compareWithAndWithoutOpt("case10_count_part_eq_limit",
+ "SELECT count(*) FROM ${tb} WHERE pt = '1' LIMIT 1")
+
+ // ==================== Step 4: Test multi-partition column table
====================
+
+ // --- Case 11: both partition columns equality + LIMIT (opt eligible)
---
+ compareWithAndWithoutOpt("case11_multi_part_eq_limit",
+ "SELECT * FROM ${tb2} WHERE dt = '2026-01-01' AND region = 'r1'
ORDER BY id LIMIT 3")
+
+ // --- Case 12: single partition column equality + LIMIT (opt
eligible) ---
+ compareWithAndWithoutOpt("case12_single_part_eq_limit",
+ "SELECT * FROM ${tb2} WHERE dt = '2026-01-03' ORDER BY id LIMIT 5")
+
+ // --- Case 13: partition equality + non-partition predicate + LIMIT
(no opt) ---
+ compareWithAndWithoutOpt("case13_multi_mixed_limit",
+ "SELECT * FROM ${tb2} WHERE dt = '2026-01-01' AND val = 'val_1'
ORDER BY id LIMIT 5")
+
+ // --- Case 14: no predicate + LIMIT on multi-part table ---
+ compareWithAndWithoutOpt("case14_multi_no_pred_limit",
+ "SELECT * FROM ${tb2} ORDER BY id LIMIT 10")
+
+ // --- Case 15: partition range on multi-part + LIMIT (no opt) ---
+ compareWithAndWithoutOpt("case15_multi_range_limit",
+ "SELECT * FROM ${tb2} WHERE dt >= '2026-01-03' ORDER BY id LIMIT
10")
+
+ // ==================== Step 4b: Complex queries (JOIN / aggregation /
subquery) ====================
+
+ // --- Case 16: self-JOIN with partition equality + LIMIT ---
+ compareWithAndWithoutOpt("case16_self_join_limit",
+ "SELECT a.id, a.name, b.name AS name2 FROM ${tb} a JOIN ${tb} b ON
a.id = b.id WHERE a.pt = '1' ORDER BY a.id LIMIT 5")
+
+ // --- Case 17: JOIN between two MC tables with partition equality +
LIMIT ---
+ compareWithAndWithoutOpt("case17_cross_table_join_limit",
+ "SELECT a.id, a.name, b.val FROM ${tb} a JOIN ${tb2} b ON a.id =
b.id WHERE a.pt = '1' AND b.dt = '2026-01-01' ORDER BY a.id LIMIT 5")
+
+ // --- Case 18: SUM + GROUP BY with partition equality + LIMIT ---
+ compareWithAndWithoutOpt("case18_sum_group_limit",
+ "SELECT pt, SUM(id) AS total_id, COUNT(*) AS cnt FROM ${tb} WHERE
pt = '1' GROUP BY pt LIMIT 5")
+
+ // --- Case 19: aggregation across all partitions + LIMIT ---
+ compareWithAndWithoutOpt("case19_agg_all_part_limit",
+ "SELECT pt, COUNT(*) AS cnt, MIN(id) AS min_id, MAX(id) AS max_id
FROM ${tb} GROUP BY pt ORDER BY pt LIMIT 10")
+
+ // --- Case 20: SUM + GROUP BY on multi-part table with partition
equality + LIMIT ---
+ compareWithAndWithoutOpt("case20_multi_part_agg_limit",
+ "SELECT dt, region, SUM(id) AS total_id FROM ${tb2} WHERE dt =
'2026-01-01' GROUP BY dt, region ORDER BY region LIMIT 5")
+
+ // --- Case 21: subquery with partition equality + LIMIT ---
+ compareWithAndWithoutOpt("case21_subquery_limit",
+ "SELECT * FROM (SELECT id, name, pt FROM ${tb} WHERE pt = '2') sub
ORDER BY id LIMIT 5")
+
+ // --- Case 22: JOIN + aggregation + LIMIT ---
+ compareWithAndWithoutOpt("case22_join_agg_limit",
+ "SELECT a.pt, COUNT(*) AS cnt, SUM(b.id) AS sum_b_id FROM ${tb} a
JOIN ${tb2} b ON a.id = b.id WHERE a.pt = '1' GROUP BY a.pt LIMIT 5")
+
+ // --- Case 23: UNION ALL with partition equality + LIMIT ---
+ compareWithAndWithoutOpt("case23_union_limit",
+ "SELECT * FROM (SELECT id, name FROM ${tb} WHERE pt = '1' UNION
ALL SELECT id, name FROM ${tb} WHERE pt = '2') u ORDER BY id LIMIT 10")
+
+ // --- Case 24: window function with partition equality + LIMIT ---
+ compareWithAndWithoutOpt("case24_window_func_limit",
+ "SELECT id, name, ROW_NUMBER() OVER (ORDER BY id) AS rn FROM ${tb}
WHERE pt = '1' ORDER BY id LIMIT 5")
+
+ // --- Case 25: HAVING + aggregation with partition equality + LIMIT
---
+ compareWithAndWithoutOpt("case25_having_limit",
+ "SELECT dt, COUNT(*) AS cnt FROM ${tb2} WHERE dt = '2026-01-01'
GROUP BY dt HAVING COUNT(*) > 0 ORDER BY dt LIMIT 5")
+
+ // ==================== Step 5: Verify optimization ON produces
correct data ====================
+ // Spot-check: opt ON result must match known data
+ sql """set enable_mc_limit_split_optimization = true"""
+
+ // Single partition has exactly 20 rows
+ def partCount = sql """SELECT count(*) FROM ${tb} WHERE pt = '1'"""
+ assert partCount[0][0] == 20
+
+ // LIMIT 5 on a single partition should return exactly 5 rows
+ def limitResult = sql """SELECT * FROM ${tb} WHERE pt = '1' ORDER BY
id LIMIT 5"""
+ assert limitResult.size() == 5
+
+ // LIMIT larger than data should return all rows in that partition
+ def limitLargeResult = sql """SELECT * FROM ${tb} WHERE pt = '1' ORDER
BY id LIMIT 100"""
+ assert limitLargeResult.size() == 20
+
+ // Multi-part: dt='2026-01-01' AND region='r1' → 5 rows
+ def multiPartResult = sql """SELECT count(*) FROM ${tb2} WHERE dt =
'2026-01-01' AND region = 'r1'"""
+ assert multiPartResult[0][0] == 5
+
+ def multiLimitResult = sql """SELECT * FROM ${tb2} WHERE dt =
'2026-01-01' AND region = 'r1' ORDER BY id LIMIT 3"""
+ assert multiLimitResult.size() == 3
+
+ // Reset session variable
+ sql """set enable_mc_limit_split_optimization = false"""
+
+ } finally {
+ sql """drop database if exists ${mc_catalog_name}.${db}"""
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]