This is an automated email from the ASF dual-hosted git repository.
morrySnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d898a1d90d6 [feature](fe) Push down limit into CTE producer (#63675)
d898a1d90d6 is described below
commit d898a1d90d66a00a0fb465fda51cc867fae79759
Author: Calvin Kirs <[email protected]>
AuthorDate: Mon Jun 1 12:26:47 2026 +0800
[feature](fe) Push down limit into CTE producer (#63675)
This PR adds CTE producer-side limit pushdown in Nereids.
When all CTE consumers only need a bounded number of rows, the optimizer
collects the required row count from each consumer, takes
the maximum value, and pushes that limit into the CTE producer. The
original consumer-side limit is still kept.
The rule only handles safe shapes:
```text
LogicalLimit
LogicalCTEConsumer
```
```text
LogicalLimit
LogicalProject
LogicalCTEConsumer
```
The project must be row-preserving.
## Scenarios
### 1. Direct Limit
```sql
WITH cte AS (
SELECT * FROM orders
)
SELECT * FROM cte
LIMIT 10;
```
The consumer only needs 10 rows, so the CTE producer can produce at most
10 rows.
### 2. Project + Limit
```sql
WITH cte AS (
SELECT order_id, total_price, user_id FROM orders
)
SELECT order_id, total_price
FROM cte
LIMIT 10;
```
A normal project only prunes columns and does not change row count, so
the producer can still be limited to 10 rows.
### 3. Multiple Consumers + Limit
```sql
WITH cte AS (
SELECT * FROM orders
)
SELECT * FROM cte LIMIT 10
UNION ALL
SELECT * FROM cte LIMIT 20;
```
For multiple CTE consumers, the producer limit is:
```text
producerLimit = max(consumerLimit1, consumerLimit2, ...)
```
In this case, the pushed producer limit is 20.
If any consumer needs full CTE data, pushdown is skipped:
```sql
WITH cte AS (
SELECT * FROM orders
)
SELECT * FROM cte LIMIT 10
UNION ALL
SELECT * FROM cte;
```
### 4. Limit + Offset
```sql
WITH cte AS (
SELECT * FROM orders
)
SELECT * FROM cte
LIMIT 10 OFFSET 100;
```
The consumer needs to skip 100 rows and then return 10 rows, so the
producer must provide at least 110 rows.
The producer side only truncates rows and does not apply offset:
```text
producerLimit = limit + offset
producerOffset = 0
```
### 5. SplitLimit
```sql
WITH cte AS (
SELECT * FROM orders
)
SELECT * FROM cte
LIMIT 10 OFFSET 100;
```
Doris may split this into local/global limits. The local limit closest
to the CTE consumer already represents `limit + offset`.
The collector uses the local limit value directly and does not add
offset again.
### 6. Filter + Limit Is Not Matched
```sql
WITH cte AS (
SELECT * FROM orders
)
SELECT * FROM cte
WHERE order_id > 10
LIMIT 10;
```
Filter can reduce rows before limit, so the producer may need more than
10 input rows. This rule does not push limit through filter.
### 7. TopN Is Not Matched
```sql
WITH cte AS (
SELECT * FROM orders
)
SELECT * FROM cte
ORDER BY order_id
LIMIT 10;
```
`ORDER BY ... LIMIT` is TopN. It needs the first N rows after ordering,
so it cannot be treated as a normal limit.
### 8. Join / Aggregate / Window / Sort Are Not Matched
```sql
WITH cte AS (
SELECT * FROM orders
)
SELECT *
FROM cte JOIN users ON cte.user_id = users.user_id
LIMIT 10;
```
```sql
WITH cte AS (
SELECT * FROM orders
)
SELECT user_id, COUNT(*)
FROM cte
GROUP BY user_id
LIMIT 10;
```
```sql
WITH cte AS (
SELECT * FROM orders
)
SELECT *
FROM (
SELECT order_id, ROW_NUMBER() OVER (ORDER BY order_id) AS rn
FROM cte
) t
LIMIT 10;
```
```sql
WITH cte AS (
SELECT * FROM orders
)
SELECT *
FROM (
SELECT * FROM cte ORDER BY order_id
) t
LIMIT 10;
```
These operators can change row cardinality or ordering semantics. Unless
other rules have already rewritten the shape into `Limit ->
CTEConsumer` or `Limit -> Project -> CTEConsumer`, this collector skips
them.
---
.../org/apache/doris/nereids/CascadesContext.java | 8 +
.../org/apache/doris/nereids/StatementContext.java | 13 ++
.../doris/nereids/jobs/executor/Rewriter.java | 3 +
.../org/apache/doris/nereids/rules/RuleType.java | 1 +
.../rules/rewrite/CollectLimitAboveConsumer.java | 58 +++++++
.../nereids/rules/rewrite/RewriteCteChildren.java | 16 ++
.../rewrite/CollectLimitAboveConsumerTest.java | 109 +++++++++++++
.../rules/rewrite/CteLimitPushdownPlanTest.java | 167 ++++++++++++++++++++
.../RewriteCteChildrenLimitPushdownTest.java | 86 ++++++++++
.../limit_push_down/limit_push_down.out | 6 +-
.../limit_push_down/order_push_down.out | 6 +-
.../test_cte_limit_pushdown.groovy | 175 +++++++++++++++++++++
12 files changed, 644 insertions(+), 4 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
index e1036af9208..8d58732eef9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
@@ -483,6 +483,14 @@ public class CascadesContext implements ScheduleContext {
return this.statementContext.getConsumerIdToFilters();
}
+ public void putConsumerIdToLimitRows(RelationId id, long rows) {
+ this.statementContext.getConsumerIdToLimitRows().merge(id, rows,
Math::max);
+ }
+
+ public Map<RelationId, Long> getConsumerIdToLimitRows() {
+ return this.statementContext.getConsumerIdToLimitRows();
+ }
+
public void addCTEConsumerGroup(CTEId cteId, Group g, Multimap<Slot, Slot>
producerSlotToConsumerSlot) {
List<Pair<Multimap<Slot, Slot>, Group>> consumerGroups =
this.statementContext.getCteIdToConsumerGroup().computeIfAbsent(cteId, k -> new
ArrayList<>());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
index 3659b1f79f5..37a7a2b3768 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
@@ -168,6 +168,7 @@ public class StatementContext implements Closeable {
private final Map<CTEId, LogicalCTEProducer<? extends Plan>>
cteIdToProducer = new HashMap<>();
private final Map<RelationId, Set<Expression>> consumerIdToFilters = new
HashMap<>();
+ private final Map<RelationId, Long> consumerIdToLimitRows = new
HashMap<>();
// Used to update consumer's stats
private final Map<CTEId, List<Pair<Multimap<Slot, Slot>, Group>>>
cteIdToConsumerGroup = new HashMap<>();
private final Map<CTEId, LogicalPlan> rewrittenCteProducer = new
HashMap<>();
@@ -644,6 +645,10 @@ public class StatementContext implements Closeable {
return consumerIdToFilters;
}
+ public Map<RelationId, Long> getConsumerIdToLimitRows() {
+ return consumerIdToLimitRows;
+ }
+
public PlaceholderId getNextPlaceholderId() {
return placeHolderIdGenerator.getNextId();
}
@@ -674,6 +679,7 @@ public class StatementContext implements Closeable {
cteIdToOutputIds.clear();
cteIdToProducer.clear();
consumerIdToFilters.clear();
+ consumerIdToLimitRows.clear();
cteIdToConsumerGroup.clear();
rewrittenCteProducer.clear();
rewrittenCteConsumer.clear();
@@ -688,6 +694,7 @@ public class StatementContext implements Closeable {
copyMapOfSets(cteIdToOutputIds),
new HashMap<>(cteIdToProducer),
copyMapOfSets(consumerIdToFilters),
+ new HashMap<>(consumerIdToLimitRows),
copyMapOfLists(cteIdToConsumerGroup),
new HashMap<>(rewrittenCteProducer),
new HashMap<>(rewrittenCteConsumer));
@@ -707,6 +714,9 @@ public class StatementContext implements Closeable {
consumerIdToFilters.clear();
consumerIdToFilters.putAll(snapshot.consumerIdToFilters);
+ consumerIdToLimitRows.clear();
+ consumerIdToLimitRows.putAll(snapshot.consumerIdToLimitRows);
+
cteIdToConsumerGroup.clear();
cteIdToConsumerGroup.putAll(snapshot.cteIdToConsumerGroup);
@@ -739,6 +749,7 @@ public class StatementContext implements Closeable {
private final Map<CTEId, Set<Slot>> cteIdToOutputIds;
private final Map<CTEId, LogicalCTEProducer<? extends Plan>>
cteIdToProducer;
private final Map<RelationId, Set<Expression>> consumerIdToFilters;
+ private final Map<RelationId, Long> consumerIdToLimitRows;
private final Map<CTEId, List<Pair<Multimap<Slot, Slot>, Group>>>
cteIdToConsumerGroup;
private final Map<CTEId, LogicalPlan> rewrittenCteProducer;
private final Map<CTEId, LogicalPlan> rewrittenCteConsumer;
@@ -751,6 +762,7 @@ public class StatementContext implements Closeable {
Map<CTEId, Set<Slot>> cteIdToOutputIds,
Map<CTEId, LogicalCTEProducer<? extends Plan>> cteIdToProducer,
Map<RelationId, Set<Expression>> consumerIdToFilters,
+ Map<RelationId, Long> consumerIdToLimitRows,
Map<CTEId, List<Pair<Multimap<Slot, Slot>, Group>>>
cteIdToConsumerGroup,
Map<CTEId, LogicalPlan> rewrittenCteProducer,
Map<CTEId, LogicalPlan> rewrittenCteConsumer) {
@@ -758,6 +770,7 @@ public class StatementContext implements Closeable {
this.cteIdToOutputIds = cteIdToOutputIds;
this.cteIdToProducer = cteIdToProducer;
this.consumerIdToFilters = consumerIdToFilters;
+ this.consumerIdToLimitRows = consumerIdToLimitRows;
this.cteIdToConsumerGroup = cteIdToConsumerGroup;
this.rewrittenCteProducer = rewrittenCteProducer;
this.rewrittenCteConsumer = rewrittenCteConsumer;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
index e118b21b7e9..f82ff9eeab4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
@@ -52,6 +52,7 @@ import org.apache.doris.nereids.rules.rewrite.CheckScoreUsage;
import org.apache.doris.nereids.rules.rewrite.ClearContextStatus;
import org.apache.doris.nereids.rules.rewrite.CollectCteConsumerOutput;
import org.apache.doris.nereids.rules.rewrite.CollectFilterAboveConsumer;
+import org.apache.doris.nereids.rules.rewrite.CollectLimitAboveConsumer;
import org.apache.doris.nereids.rules.rewrite.CollectPredicateOnScan;
import org.apache.doris.nereids.rules.rewrite.ColumnPruning;
import org.apache.doris.nereids.rules.rewrite.ConstantPropagation;
@@ -395,6 +396,7 @@ public class Rewriter extends AbstractBatchJobExecutor {
topic("Push project and filter on cte consumer to
cte producer",
topDown(
new CollectFilterAboveConsumer(),
+ new CollectLimitAboveConsumer(),
new CollectCteConsumerOutput())
),
topic("eliminate join according unique or foreign
key",
@@ -775,6 +777,7 @@ public class Rewriter extends AbstractBatchJobExecutor {
topic("Push project and filter on cte consumer to cte
producer",
topDown(
new CollectFilterAboveConsumer(),
+ new CollectLimitAboveConsumer(),
new CollectCteConsumerOutput()
)
),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index 71e7514025e..26d40c59d1a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -400,6 +400,7 @@ public enum RuleType {
CTE_INLINE(RuleTypeClass.REWRITE),
REWRITE_CTE_CHILDREN(RuleTypeClass.REWRITE),
COLLECT_FILTER_ABOVE_CTE_CONSUMER(RuleTypeClass.REWRITE),
+ COLLECT_LIMIT_ABOVE_CTE_CONSUMER(RuleTypeClass.REWRITE),
INLINE_VIEW(RuleTypeClass.REWRITE),
CHECK_PRIVILEGES(RuleTypeClass.REWRITE),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CollectLimitAboveConsumer.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CollectLimitAboveConsumer.java
new file mode 100644
index 00000000000..bb573304e3a
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CollectLimitAboveConsumer.java
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
+import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+/**
+ * Collect limit rows needed by CTE consumers.
+ */
+public class CollectLimitAboveConsumer implements RewriteRuleFactory {
+
+ @Override
+ public List<Rule> buildRules() {
+ return ImmutableList.of(
+ logicalLimit(logicalCTEConsumer()).thenApply(ctx -> {
+ LogicalLimit<LogicalCTEConsumer> limit = ctx.root;
+ collectLimitRows(ctx.cascadesContext, limit,
limit.child());
+ return ctx.root;
+ }).toRule(RuleType.COLLECT_LIMIT_ABOVE_CTE_CONSUMER),
+ logicalLimit(logicalProject(logicalCTEConsumer()))
+ .thenApply(ctx -> {
+ LogicalLimit<LogicalProject<LogicalCTEConsumer>>
limit = ctx.root;
+ collectLimitRows(ctx.cascadesContext, limit,
limit.child().child());
+ return ctx.root;
+ }).toRule(RuleType.COLLECT_LIMIT_ABOVE_CTE_CONSUMER)
+ );
+ }
+
+ private void collectLimitRows(CascadesContext cascadesContext,
LogicalLimit<?> limit,
+ LogicalCTEConsumer cteConsumer) {
+ cascadesContext.putConsumerIdToLimitRows(
+ cteConsumer.getRelationId(), limit.getLimit() +
limit.getOffset());
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/RewriteCteChildren.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/RewriteCteChildren.java
index 29c83e24bf5..c957a9e853f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/RewriteCteChildren.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/RewriteCteChildren.java
@@ -28,12 +28,14 @@ import
org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter;
@@ -124,6 +126,7 @@ public class RewriteCteChildren extends
DefaultPlanRewriter<CascadesContext> imp
} else {
child = (LogicalPlan) cteProducer.child();
child = tryToConstructFilter(cascadesContext,
cteProducer.getCteId(), child);
+ child = tryToConstructLimit(cascadesContext,
cteProducer.getCteId(), child);
Set<Slot> producerOutputs = cascadesContext.getStatementContext()
.getCteIdToOutputIds().get(cteProducer.getCteId());
if (producerOutputs != null && producerOutputs.size() <
child.getOutput().size()) {
@@ -162,6 +165,19 @@ public class RewriteCteChildren extends
DefaultPlanRewriter<CascadesContext> imp
return plan;
}
+ private LogicalPlan tryToConstructLimit(CascadesContext cascadesContext,
CTEId cteId, LogicalPlan child) {
+ Set<LogicalCTEConsumer> consumers =
cascadesContext.getCteIdToConsumers().get(cteId);
+ long limit = 0;
+ for (LogicalCTEConsumer consumer : consumers) {
+ Long rowsNeeded =
cascadesContext.getConsumerIdToLimitRows().get(consumer.getRelationId());
+ if (rowsNeeded == null) {
+ return child;
+ }
+ limit = Math.max(limit, rowsNeeded);
+ }
+ return pushPlanUnderAnchor(new LogicalLimit<>(limit, 0,
LimitPhase.ORIGIN, child));
+ }
+
/*
* An expression can only be pushed down if it has filter expressions on
all consumers that reference the slot.
* For example, let's assume a producer has two consumers, consumer1 and
consumer2:
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/CollectLimitAboveConsumerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/CollectLimitAboveConsumerTest.java
new file mode 100644
index 00000000000..94532027e66
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/CollectLimitAboveConsumerTest.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.trees.expressions.CTEId;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
+import org.apache.doris.nereids.trees.plans.RelationId;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
+import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.util.MemoTestUtils;
+import org.apache.doris.nereids.util.PlanConstructor;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for {@link CollectLimitAboveConsumer}.
+ */
+class CollectLimitAboveConsumerTest {
+
+ @Test
+ void testCollectDirectLimitRowsNeeded() {
+ LogicalOlapScan producerPlan = PlanConstructor.newLogicalOlapScan(0,
"t1", 0);
+ LogicalCTEConsumer consumer = new LogicalCTEConsumer(
+ PlanConstructor.getNextRelationId(), new CTEId(1), "cte1",
producerPlan);
+ LogicalLimit<LogicalCTEConsumer> limit = new LogicalLimit<>(10, 5,
LimitPhase.ORIGIN, consumer);
+
+ CascadesContext cascadesContext =
MemoTestUtils.createCascadesContext(new ConnectContext(), limit);
+ Rule rule = new CollectLimitAboveConsumer().buildRules().get(0);
+ rule.transform(limit, cascadesContext);
+
+ Map<RelationId, Long> collected =
cascadesContext.getStatementContext().getConsumerIdToLimitRows();
+ Assertions.assertEquals(15L, collected.get(consumer.getRelationId()));
+ }
+
+ @Test
+ void testCollectLocalLimitRowsNeededWithoutAddingOffsetAgain() {
+ LogicalOlapScan producerPlan = PlanConstructor.newLogicalOlapScan(1,
"t2", 0);
+ LogicalCTEConsumer consumer = new LogicalCTEConsumer(
+ PlanConstructor.getNextRelationId(), new CTEId(2), "cte2",
producerPlan);
+ LogicalLimit<LogicalCTEConsumer> limit = new LogicalLimit<>(15, 0,
LimitPhase.LOCAL, consumer);
+
+ CascadesContext cascadesContext =
MemoTestUtils.createCascadesContext(new ConnectContext(), limit);
+ Rule rule = new CollectLimitAboveConsumer().buildRules().get(0);
+ rule.transform(limit, cascadesContext);
+
+ Map<RelationId, Long> collected =
cascadesContext.getStatementContext().getConsumerIdToLimitRows();
+ Assertions.assertEquals(15L, collected.get(consumer.getRelationId()));
+ }
+
+ @Test
+ void testKeepMaxRowsNeededWhenConsumerIsCollectedMultipleTimes() {
+ LogicalOlapScan producerPlan = PlanConstructor.newLogicalOlapScan(10,
"t_merge", 0);
+ LogicalCTEConsumer consumer = new LogicalCTEConsumer(
+ PlanConstructor.getNextRelationId(), new CTEId(10),
"cte_merge", producerPlan);
+ LogicalLimit<LogicalCTEConsumer> highLimit = new LogicalLimit<>(20, 0,
LimitPhase.ORIGIN, consumer);
+ LogicalLimit<LogicalCTEConsumer> lowLimit = new LogicalLimit<>(3, 0,
LimitPhase.ORIGIN, consumer);
+
+ CascadesContext cascadesContext =
MemoTestUtils.createCascadesContext(new ConnectContext(), highLimit);
+ Rule rule = new CollectLimitAboveConsumer().buildRules().get(0);
+ rule.transform(highLimit, cascadesContext);
+ rule.transform(lowLimit, cascadesContext);
+
+ Map<RelationId, Long> collected =
cascadesContext.getStatementContext().getConsumerIdToLimitRows();
+ Assertions.assertEquals(20L, collected.get(consumer.getRelationId()));
+ }
+
+ @Test
+ void testCollectLimitAboveProjectRowsNeeded() {
+ LogicalOlapScan producerPlan = PlanConstructor.newLogicalOlapScan(2,
"t3", 0);
+ LogicalCTEConsumer consumer = new LogicalCTEConsumer(
+ PlanConstructor.getNextRelationId(), new CTEId(3), "cte3",
producerPlan);
+ LogicalProject<LogicalCTEConsumer> project = new LogicalProject<>(
+ ImmutableList.copyOf(consumer.getOutput()), consumer);
+ LogicalLimit<LogicalProject<LogicalCTEConsumer>> limit = new
LogicalLimit<>(
+ 7, 0, LimitPhase.LOCAL, project);
+
+ CascadesContext cascadesContext =
MemoTestUtils.createCascadesContext(new ConnectContext(), limit);
+ List<Rule> rules = new CollectLimitAboveConsumer().buildRules();
+ rules.get(1).transform(limit, cascadesContext);
+
+ Map<RelationId, Long> collected =
cascadesContext.getStatementContext().getConsumerIdToLimitRows();
+ Assertions.assertEquals(7L, collected.get(consumer.getRelationId()));
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/CteLimitPushdownPlanTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/CteLimitPushdownPlanTest.java
new file mode 100644
index 00000000000..fae1be1f6d4
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/CteLimitPushdownPlanTest.java
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
+import org.apache.doris.nereids.util.MemoPatternMatchSupported;
+import org.apache.doris.nereids.util.PlanChecker;
+import org.apache.doris.utframe.TestWithFeService;
+
+import org.junit.jupiter.api.Test;
+
+/**
+ * Planner-level tests for CTE limit pushdown.
+ */
+class CteLimitPushdownPlanTest extends TestWithFeService implements
MemoPatternMatchSupported {
+
+ @Override
+ protected void runBeforeAll() throws Exception {
+ createDatabase("test");
+ useDatabase("test");
+
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
+
+ createTable("CREATE TABLE cte_limit_pushdown_t (\n"
+ + " k1 int NULL,\n"
+ + " k2 int NULL\n"
+ + ") ENGINE=OLAP\n"
+ + "DISTRIBUTED BY HASH(k1) BUCKETS 1\n"
+ + "PROPERTIES (\n"
+ + " \"replication_allocation\" = \"tag.location.default:
1\"\n"
+ + ");");
+ }
+
+ @Override
+ protected void runBeforeEach() throws Exception {
+ StatementScopeIdGenerator.clear();
+ }
+
+ @Test
+ void testPushLimitWithOffsetToProducer() {
+ String sql = "WITH cte AS (SELECT k1, k2 FROM cte_limit_pushdown_t) "
+ + "(SELECT * FROM cte LIMIT 10 OFFSET 5) "
+ + "UNION ALL "
+ + "(SELECT * FROM cte LIMIT 3)";
+
+ PlanChecker.from(connectContext)
+ .analyze(sql)
+ .rewrite()
+ .matches(logicalCTEProducer(
+ logicalLimit().when(limit -> limit.getLimit() == 15 &&
limit.getOffset() == 0)));
+ }
+
+ @Test
+ void testPushLimitBeforeProducerOutputPruning() {
+ String sql = "WITH cte AS (SELECT k1, k2 FROM cte_limit_pushdown_t) "
+ + "(SELECT k1 FROM cte LIMIT 7) "
+ + "UNION ALL "
+ + "(SELECT k1 FROM cte LIMIT 3)";
+
+ PlanChecker.from(connectContext)
+ .analyze(sql)
+ .rewrite()
+ .matches(logicalCTEProducer(
+ logicalLimit().when(limit -> limit.getLimit() == 7 &&
limit.getOffset() == 0)));
+ }
+
+ @Test
+ void testPushMaxLimitForAllLimitedConsumers() {
+ String sql = "WITH cte AS (SELECT k1, k2 FROM cte_limit_pushdown_t) "
+ + "(SELECT * FROM cte LIMIT 10 OFFSET 5) "
+ + "UNION ALL "
+ + "(SELECT * FROM cte LIMIT 20)";
+
+ PlanChecker.from(connectContext)
+ .analyze(sql)
+ .rewrite()
+ .matches(logicalCTEProducer(
+ logicalLimit().when(limit -> limit.getLimit() == 20 &&
limit.getOffset() == 0)));
+ }
+
+ @Test
+ void testSkipProducerLimitWhenAnyConsumerNeedsFullRows() {
+ String sql = "WITH cte AS (SELECT k1, k2 FROM cte_limit_pushdown_t) "
+ + "(SELECT * FROM cte LIMIT 10) "
+ + "UNION ALL "
+ + "(SELECT * FROM cte)";
+
+ assertNoProducerLimit(sql);
+ }
+
+ @Test
+ void testSkipProducerLimitWhenLimitIsAboveFilter() {
+ String sql = "WITH cte AS (SELECT k1, k2 FROM cte_limit_pushdown_t) "
+ + "(SELECT * FROM cte WHERE k1 > 1 LIMIT 10) "
+ + "UNION ALL "
+ + "(SELECT * FROM cte LIMIT 3)";
+
+ assertNoProducerLimit(sql);
+ }
+
+ @Test
+ void testSkipProducerLimitWhenConsumerUsesTopN() {
+ String sql = "WITH cte AS (SELECT k1, k2 FROM cte_limit_pushdown_t) "
+ + "(SELECT * FROM (SELECT * FROM cte ORDER BY k1 LIMIT 10)
topn_branch) "
+ + "UNION ALL "
+ + "(SELECT * FROM cte LIMIT 3)";
+
+ assertNoProducerLimit(sql);
+ }
+
+ @Test
+ void testSkipProducerLimitWhenLimitIsAboveJoin() {
+ String sql = "WITH cte AS (SELECT k1, k2 FROM cte_limit_pushdown_t) "
+ + "(SELECT c.k1, c.k2 FROM cte c "
+ + "JOIN cte_limit_pushdown_t t ON c.k1 = t.k1 LIMIT 10) "
+ + "UNION ALL "
+ + "(SELECT * FROM cte LIMIT 3)";
+
+ assertNoProducerLimit(sql);
+ }
+
+ @Test
+ void testSkipProducerLimitWhenLimitIsAboveAggregate() {
+ String sql = "WITH cte AS (SELECT k1, k2 FROM cte_limit_pushdown_t) "
+ + "(SELECT k1, COUNT(*) FROM cte GROUP BY k1 LIMIT 10) "
+ + "UNION ALL "
+ + "(SELECT * FROM cte LIMIT 3)";
+
+ assertNoProducerLimit(sql);
+ }
+
+ @Test
+ void testSkipProducerLimitWhenLimitIsAboveWindow() {
+ String sql = "WITH cte AS (SELECT k1, k2 FROM cte_limit_pushdown_t) "
+ + "(SELECT k1, rn FROM ("
+ + "SELECT k1, ROW_NUMBER() OVER (ORDER BY k1) AS rn FROM cte"
+ + ") window_branch LIMIT 10) "
+ + "UNION ALL "
+ + "(SELECT * FROM cte LIMIT 3)";
+
+ assertNoProducerLimit(sql);
+ }
+
+ private void assertNoProducerLimit(String sql) {
+ PlanChecker.from(connectContext)
+ .analyze(sql)
+ .rewrite()
+ .matches(logicalCTEProducer())
+ .nonMatch(logicalCTEProducer(logicalLimit()))
+ .nonMatch(logicalCTEProducer(logicalLimit(logicalProject())))
+ .nonMatch(logicalCTEProducer(logicalProject(logicalLimit())));
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/RewriteCteChildrenLimitPushdownTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/RewriteCteChildrenLimitPushdownTest.java
new file mode 100644
index 00000000000..fb7a07e47b6
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/RewriteCteChildrenLimitPushdownTest.java
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.trees.expressions.CTEId;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer;
+import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
+import org.apache.doris.nereids.util.MemoTestUtils;
+import org.apache.doris.nereids.util.PlanConstructor;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests producer-side CTE limit construction in {@link RewriteCteChildren}.
+ */
+class RewriteCteChildrenLimitPushdownTest {
+
+ @Test
+ void testPushMaxConsumerLimitToProducer() {
+ LogicalOlapScan scan = PlanConstructor.newLogicalOlapScan(0, "t1", 0);
+ CTEId cteId = new CTEId(1);
+ LogicalCTEProducer<LogicalOlapScan> producer = new
LogicalCTEProducer<>(cteId, scan);
+ LogicalCTEConsumer consumer1 = new LogicalCTEConsumer(
+ PlanConstructor.getNextRelationId(), cteId, "cte1", producer);
+ LogicalCTEConsumer consumer2 = new LogicalCTEConsumer(
+ PlanConstructor.getNextRelationId(), cteId, "cte1", producer);
+ CascadesContext cascadesContext =
MemoTestUtils.createCascadesContext(new ConnectContext(), producer);
+ cascadesContext.getCteIdToConsumers().put(cteId,
ImmutableSet.of(consumer1, consumer2));
+ cascadesContext.putConsumerIdToLimitRows(consumer1.getRelationId(),
10L);
+ cascadesContext.putConsumerIdToLimitRows(consumer2.getRelationId(),
20L);
+
+ Plan rewritten = new RewriteCteChildren(ImmutableList.of(), false)
+ .visitLogicalCTEProducer(producer, cascadesContext);
+
+ LogicalCTEProducer<?> rewrittenProducer = (LogicalCTEProducer<?>)
rewritten;
+ Assertions.assertInstanceOf(LogicalLimit.class,
rewrittenProducer.child());
+ LogicalLimit<?> limit = (LogicalLimit<?>) rewrittenProducer.child();
+ Assertions.assertEquals(20L, limit.getLimit());
+ Assertions.assertEquals(0L, limit.getOffset());
+ Assertions.assertEquals(LimitPhase.ORIGIN, limit.getPhase());
+ }
+
+ @Test
+ void testSkipProducerLimitWhenAnyConsumerHasNoLimit() {
+ LogicalOlapScan scan = PlanConstructor.newLogicalOlapScan(1, "t2", 0);
+ CTEId cteId = new CTEId(2);
+ LogicalCTEProducer<LogicalOlapScan> producer = new
LogicalCTEProducer<>(cteId, scan);
+ LogicalCTEConsumer consumer1 = new LogicalCTEConsumer(
+ PlanConstructor.getNextRelationId(), cteId, "cte2", producer);
+ LogicalCTEConsumer consumer2 = new LogicalCTEConsumer(
+ PlanConstructor.getNextRelationId(), cteId, "cte2", producer);
+ CascadesContext cascadesContext =
MemoTestUtils.createCascadesContext(new ConnectContext(), producer);
+ cascadesContext.getCteIdToConsumers().put(cteId,
ImmutableSet.of(consumer1, consumer2));
+ cascadesContext.putConsumerIdToLimitRows(consumer1.getRelationId(),
10L);
+
+ Plan rewritten = new RewriteCteChildren(ImmutableList.of(), false)
+ .visitLogicalCTEProducer(producer, cascadesContext);
+
+ LogicalCTEProducer<?> rewrittenProducer = (LogicalCTEProducer<?>)
rewritten;
+ Assertions.assertSame(scan, rewrittenProducer.child());
+ }
+}
diff --git
a/regression-test/data/nereids_rules_p0/limit_push_down/limit_push_down.out
b/regression-test/data/nereids_rules_p0/limit_push_down/limit_push_down.out
index f0d90758204..2b3d526546e 100644
--- a/regression-test/data/nereids_rules_p0/limit_push_down/limit_push_down.out
+++ b/regression-test/data/nereids_rules_p0/limit_push_down/limit_push_down.out
@@ -399,8 +399,10 @@ PhysicalResultSink
-- !limit_cte_query --
PhysicalCteAnchor ( cteId=CTEId#0 )
--PhysicalCteProducer ( cteId=CTEId#0 )
-----filter((t1.id < 10))
-------PhysicalOlapScan[t1]
+----PhysicalLimit[GLOBAL]
+------PhysicalLimit[LOCAL]
+--------filter((t1.id < 10))
+----------PhysicalOlapScan[t1]
--PhysicalResultSink
----PhysicalLimit[GLOBAL]
------PhysicalLimit[LOCAL]
diff --git
a/regression-test/data/nereids_rules_p0/limit_push_down/order_push_down.out
b/regression-test/data/nereids_rules_p0/limit_push_down/order_push_down.out
index fbed0a9f889..8ab46e582e5 100644
--- a/regression-test/data/nereids_rules_p0/limit_push_down/order_push_down.out
+++ b/regression-test/data/nereids_rules_p0/limit_push_down/order_push_down.out
@@ -491,8 +491,10 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
-- !limit_cte_outside_query --
PhysicalCteAnchor ( cteId=CTEId#0 )
--PhysicalCteProducer ( cteId=CTEId#0 )
-----filter((t1.id < 10))
-------PhysicalOlapScan[t1]
+----PhysicalLimit[GLOBAL]
+------PhysicalLimit[LOCAL]
+--------filter((t1.id < 10))
+----------PhysicalOlapScan[t1]
--PhysicalResultSink
----PhysicalLimit[GLOBAL]
------PhysicalLimit[LOCAL]
diff --git
a/regression-test/suites/nereids_rules_p0/cte_limit_pushdown/test_cte_limit_pushdown.groovy
b/regression-test/suites/nereids_rules_p0/cte_limit_pushdown/test_cte_limit_pushdown.groovy
new file mode 100644
index 00000000000..863b1b1a3f1
--- /dev/null
+++
b/regression-test/suites/nereids_rules_p0/cte_limit_pushdown/test_cte_limit_pushdown.groovy
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_cte_limit_pushdown") {
+ sql "SET enable_nereids_planner=true"
+ sql "SET enable_fallback_to_original_planner=false"
+ sql "SET disable_nereids_rules='PRUNE_EMPTY_PARTITION'"
+
+ sql "DROP TABLE IF EXISTS cte_limit_pushdown_regression_t"
+ sql """
+ CREATE TABLE cte_limit_pushdown_regression_t (
+ k1 int NULL,
+ k2 int NULL
+ )
+ DUPLICATE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ sql """
+ INSERT INTO cte_limit_pushdown_regression_t VALUES
+ (1, 10), (2, 20), (3, 30), (4, 40), (5, 50), (6, 60)
+ """
+
+ def cteProducerFragment = { explainString ->
+ int multicast = explainString.indexOf("MultiCastDataSinks")
+ assert multicast >= 0
+ int fragmentStart = explainString.lastIndexOf("PLAN FRAGMENT",
multicast)
+ assert fragmentStart >= 0
+ int fragmentEnd = explainString.indexOf("PLAN FRAGMENT", multicast + 1)
+ if (fragmentEnd < 0) {
+ fragmentEnd = explainString.length()
+ }
+ return explainString.substring(fragmentStart, fragmentEnd)
+ }
+
+ def cteProducerSourceBlock = { explainString ->
+ int multicast = explainString.indexOf("MultiCastDataSinks")
+ assert multicast >= 0
+ int scanStart = explainString.indexOf(":VOlapScanNode", multicast)
+ assert scanStart >= 0
+ int nextFragment = explainString.indexOf("PLAN FRAGMENT", scanStart +
1)
+ if (nextFragment < 0) {
+ nextFragment = explainString.length()
+ }
+ return explainString.substring(scanStart, nextFragment)
+ }
+
+ def hasExactLimit = { planBlock, expectedLimit ->
+ return planBlock.readLines().any { line -> line.trim() == "limit:
${expectedLimit}" }
+ }
+
+ def hasAnyLimit = { planBlock ->
+ return planBlock.readLines().any { line ->
line.trim().startsWith("limit: ") }
+ }
+
+ def assertProducerLimit = { explainString, expectedLimit ->
+ String producerFragment = cteProducerFragment(explainString)
+ String producerSource = cteProducerSourceBlock(explainString)
+ assert producerFragment.contains("MultiCastDataSinks")
+ assert producerSource.contains("cte_limit_pushdown_regression_t")
+ assert hasExactLimit(producerFragment, expectedLimit)
+ assert hasExactLimit(producerSource, expectedLimit)
+ return true
+ }
+
+ def assertNoProducerLimit = { explainString ->
+ String producerFragment = cteProducerFragment(explainString)
+ String producerSource = cteProducerSourceBlock(explainString)
+ assert producerFragment.contains("MultiCastDataSinks")
+ assert producerSource.contains("cte_limit_pushdown_regression_t")
+ assert !hasAnyLimit(producerFragment)
+ assert !hasAnyLimit(producerSource)
+ return true
+ }
+
+ explain {
+ sql """
+ WITH cte AS (SELECT k1, k2 FROM cte_limit_pushdown_regression_t)
+ (SELECT * FROM cte LIMIT 10 OFFSET 5)
+ UNION ALL
+ (SELECT * FROM cte LIMIT 3)
+ """
+ check { explainString -> assertProducerLimit(explainString, 15) }
+ }
+
+ explain {
+ sql """
+ WITH cte AS (SELECT k1, k2 FROM cte_limit_pushdown_regression_t)
+ (SELECT k1 FROM cte LIMIT 7)
+ UNION ALL
+ (SELECT k1 FROM cte LIMIT 3)
+ """
+ check { explainString -> assertProducerLimit(explainString, 7) }
+ }
+
+ explain {
+ sql """
+ WITH cte AS (SELECT k1, k2 FROM cte_limit_pushdown_regression_t)
+ (SELECT * FROM cte LIMIT 10)
+ UNION ALL
+ (SELECT * FROM cte)
+ """
+ check { explainString -> assertNoProducerLimit(explainString) }
+ }
+
+ explain {
+ sql """
+ WITH cte AS (SELECT k1, k2 FROM cte_limit_pushdown_regression_t)
+ (SELECT * FROM cte WHERE k1 > 1 LIMIT 10)
+ UNION ALL
+ (SELECT * FROM cte LIMIT 3)
+ """
+ check { explainString -> assertNoProducerLimit(explainString) }
+ }
+
+ explain {
+ sql """
+ WITH cte AS (SELECT k1, k2 FROM cte_limit_pushdown_regression_t)
+ (SELECT * FROM (SELECT * FROM cte ORDER BY k1 LIMIT 10)
topn_branch)
+ UNION ALL
+ (SELECT * FROM cte LIMIT 3)
+ """
+ check { explainString -> assertNoProducerLimit(explainString) }
+ }
+
+ explain {
+ sql """
+ WITH cte AS (SELECT k1, k2 FROM cte_limit_pushdown_regression_t)
+ (SELECT c.k1, c.k2 FROM cte c
+ JOIN cte_limit_pushdown_regression_t t ON c.k1 = t.k1 LIMIT 10)
+ UNION ALL
+ (SELECT * FROM cte LIMIT 3)
+ """
+ check { explainString -> assertNoProducerLimit(explainString) }
+ }
+
+ explain {
+ sql """
+ WITH cte AS (SELECT k1, k2 FROM cte_limit_pushdown_regression_t)
+ (SELECT k1, COUNT(*) FROM cte GROUP BY k1 LIMIT 10)
+ UNION ALL
+ (SELECT * FROM cte LIMIT 3)
+ """
+ check { explainString -> assertNoProducerLimit(explainString) }
+ }
+
+ explain {
+ sql """
+ WITH cte AS (SELECT k1, k2 FROM cte_limit_pushdown_regression_t)
+ (SELECT k1, rn FROM (
+ SELECT k1, ROW_NUMBER() OVER (ORDER BY k1) AS rn FROM cte
+ ) window_branch LIMIT 10)
+ UNION ALL
+ (SELECT * FROM cte LIMIT 3)
+ """
+ check { explainString -> assertNoProducerLimit(explainString) }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]