This is an automated email from the ASF dual-hosted git repository.

starocean999 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d513b11eb4e [feature](nereids)support correlated scalar subquery 
without scalar agg (#39471)
d513b11eb4e is described below

commit d513b11eb4e23cc583f6048aada1baae5dcc3427
Author: starocean999 <40539150+starocean...@users.noreply.github.com>
AuthorDate: Fri Sep 20 09:34:26 2024 +0800

    [feature](nereids)support correlated scalar subquery without scalar agg 
(#39471)
    
    support correlated scalar subquery without scalar agg like:
    `select t1.c1 from t1
        where t1.c2 > (select t2.c2 from t2 where t1.c1 = t2.c1);`
    after this pr, nereids would produce a correct plan for above sql.
---
 .../doris/nereids/jobs/executor/Rewriter.java      |   7 +-
 .../post/PushDownFilterThroughProject.java         |   3 +-
 .../doris/nereids/processor/post/Validator.java    |   3 +-
 .../nereids/rules/analysis/ExpressionAnalyzer.java |  15 --
 .../nereids/rules/analysis/FillUpMissingSlots.java |  73 ++++--
 .../nereids/rules/analysis/SubExprAnalyzer.java    | 259 +++++++++++++++++----
 .../nereids/rules/analysis/SubqueryToApply.java    | 160 ++++++++++---
 .../expression/rules/FoldConstantRuleOnFE.java     |  19 +-
 ...CorrelatedFilterUnderApplyAggregateProject.java |  23 +-
 .../rules/rewrite/PullUpProjectUnderApply.java     |  10 +-
 .../rewrite/PushDownFilterThroughProject.java      |   4 +-
 .../nereids/trees/expressions/ScalarSubquery.java  |  62 ++++-
 .../expressions/functions/AlwaysNotNullable.java   |   9 +
 ...ysNotNullable.java => NoneMovableFunction.java} |  11 +-
 .../trees/expressions/functions/agg/ArrayAgg.java  |   7 +
 .../trees/expressions/functions/agg/BitmapAgg.java |   6 +
 .../expressions/functions/agg/BitmapIntersect.java |   6 +
 .../expressions/functions/agg/BitmapUnion.java     |   6 +
 .../functions/agg/BitmapUnionCount.java            |   7 +
 .../expressions/functions/agg/BitmapUnionInt.java  |   6 +
 .../expressions/functions/agg/CollectList.java     |   7 +
 .../expressions/functions/agg/CollectSet.java      |   7 +
 .../trees/expressions/functions/agg/Count.java     |   6 +
 .../expressions/functions/agg/CountByEnum.java     |   6 +
 .../functions/agg/GroupArrayIntersect.java         |   7 +
 .../trees/expressions/functions/agg/Histogram.java |   6 +
 .../trees/expressions/functions/agg/HllUnion.java  |   6 +
 .../expressions/functions/agg/HllUnionAgg.java     |   6 +
 .../expressions/functions/agg/IntersectCount.java  |   6 +
 .../trees/expressions/functions/agg/MapAgg.java    |   7 +
 .../functions/agg/MultiDistinctCount.java          |   6 +
 .../functions/agg/MultiDistinctSum0.java           |  23 ++
 .../trees/expressions/functions/agg/Ndv.java       |   6 +
 .../expressions/functions/agg/PercentileArray.java |   7 +
 .../expressions/functions/agg/QuantileUnion.java   |   6 +
 .../expressions/functions/agg/SequenceCount.java   |   6 +
 .../trees/expressions/functions/agg/Sum0.java      |  22 ++
 .../expressions/functions/scalar/AssertTrue.java   |   3 +-
 .../org/apache/doris/nereids/trees/plans/Plan.java |   4 +-
 .../doris/nereids/trees/plans/algebra/Project.java |  22 +-
 .../trees/plans/logical/LogicalProject.java        |  12 +-
 .../nereids/rules/expression/FoldConstantTest.java |  16 ++
 .../data/nereids_hint_tpcds_p0/shape/query41.out   |   2 +-
 .../subquery/correlated_scalar_subquery.out        | 108 +++++++++
 .../shape/query41.out                              |   2 +-
 .../noStatsRfPrune/query41.out                     |   2 +-
 .../no_stats_shape/query41.out                     |   2 +-
 .../rf_prune/query41.out                           |   2 +-
 .../nereids_tpcds_shape_sf100_p0/shape/query41.out |   2 +-
 .../tpcds_sf100/noStatsRfPrune/query41.out         |   2 +-
 .../tpcds_sf100/no_stats_shape/query41.out         |   2 +-
 .../new_shapes_p0/tpcds_sf100/rf_prune/query41.out |   2 +-
 .../new_shapes_p0/tpcds_sf100/shape/query41.out    |   2 +-
 .../new_shapes_p0/tpcds_sf1000/shape/query41.out   |   2 +-
 .../subquery/correlated_scalar_subquery.groovy     | 223 ++++++++++++++++++
 55 files changed, 1094 insertions(+), 152 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
index 4ab4165a446..9c3d3ebecba 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
@@ -188,7 +188,12 @@ public class Rewriter extends AbstractBatchJobExecutor {
                         // after doing NormalizeAggregate in analysis job
                         // we need run the following 2 rules to make 
AGG_SCALAR_SUBQUERY_TO_WINDOW_FUNCTION work
                         bottomUp(new PullUpProjectUnderApply()),
-                        topDown(new PushDownFilterThroughProject()),
+                        topDown(
+                                new PushDownFilterThroughProject(),
+                                // the subquery may have where and having 
clause
+                                // so there may be two filters we need to 
merge them
+                                new MergeFilters()
+                        ),
                         custom(RuleType.AGG_SCALAR_SUBQUERY_TO_WINDOW_FUNCTION,
                                 AggScalarSubQueryToWindowFunction::new),
                         bottomUp(
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PushDownFilterThroughProject.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PushDownFilterThroughProject.java
index 864e817dc1f..671abc2c490 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PushDownFilterThroughProject.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PushDownFilterThroughProject.java
@@ -32,7 +32,8 @@ public class PushDownFilterThroughProject extends 
PlanPostProcessor {
     public Plan visitPhysicalFilter(PhysicalFilter<? extends Plan> filter, 
CascadesContext context) {
         filter = (PhysicalFilter<? extends Plan>) super.visit(filter, context);
         Plan child = filter.child();
-        if (!(child instanceof PhysicalProject)) {
+        // don't push down filter if child project contains NoneMovableFunction
+        if (!(child instanceof PhysicalProject) || ((PhysicalProject) 
child).containsNoneMovableFunction()) {
             return filter;
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/Validator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/Validator.java
index 4504b92fc7f..62881a463d1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/Validator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/Validator.java
@@ -61,7 +61,8 @@ public class Validator extends PlanPostProcessor {
 
         Plan child = filter.child();
         // Forbidden filter-project, we must make filter-project -> 
project-filter.
-        if (child instanceof PhysicalProject) {
+        // except that the project contains NoneMovableFunction
+        if (child instanceof PhysicalProject && !((PhysicalProject<?>) 
child).containsNoneMovableFunction()) {
             throw new AnalysisException(
                     "Nereids generate a filter-project plan, but backend not 
support:\n" + filter.treeString());
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ExpressionAnalyzer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ExpressionAnalyzer.java
index 8624ba205c5..49789aa66e1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ExpressionAnalyzer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/ExpressionAnalyzer.java
@@ -65,15 +65,12 @@ import org.apache.doris.nereids.trees.expressions.Variable;
 import org.apache.doris.nereids.trees.expressions.WhenClause;
 import org.apache.doris.nereids.trees.expressions.functions.BoundFunction;
 import org.apache.doris.nereids.trees.expressions.functions.FunctionBuilder;
-import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
 import org.apache.doris.nereids.trees.expressions.functions.scalar.ElementAt;
 import org.apache.doris.nereids.trees.expressions.functions.scalar.Lambda;
-import org.apache.doris.nereids.trees.expressions.functions.scalar.Nvl;
 import 
org.apache.doris.nereids.trees.expressions.functions.udf.AliasUdfBuilder;
 import org.apache.doris.nereids.trees.expressions.functions.udf.JavaUdaf;
 import org.apache.doris.nereids.trees.expressions.functions.udf.JavaUdf;
 import org.apache.doris.nereids.trees.expressions.functions.udf.UdfBuilder;
-import org.apache.doris.nereids.trees.expressions.literal.BigIntLiteral;
 import org.apache.doris.nereids.trees.expressions.literal.IntegerLikeLiteral;
 import org.apache.doris.nereids.trees.expressions.literal.Literal;
 import org.apache.doris.nereids.trees.expressions.literal.StringLiteral;
@@ -425,18 +422,6 @@ public class ExpressionAnalyzer extends 
SubExprAnalyzer<ExpressionRewriteContext
             return buildResult.first;
         } else {
             Expression castFunction = 
TypeCoercionUtils.processBoundFunction((BoundFunction) buildResult.first);
-            if (castFunction instanceof Count
-                    && context != null
-                    && context.cascadesContext.getOuterScope().isPresent()
-                    && 
!context.cascadesContext.getOuterScope().get().getCorrelatedSlots().isEmpty()) {
-                // consider sql: SELECT * FROM t1 WHERE t1.a <= (SELECT 
COUNT(t2.a) FROM t2 WHERE (t1.b = t2.b));
-                // when unnest correlated subquery, we create a left join node.
-                // outer query is left table and subquery is right one
-                // if there is no match, the row from right table is filled 
with nulls
-                // but COUNT function is always not nullable.
-                // so wrap COUNT with Nvl to ensure it's result is 0 instead 
of null to get the correct result
-                castFunction = new Nvl(castFunction, new BigIntLiteral(0));
-            }
             return castFunction;
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/FillUpMissingSlots.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/FillUpMissingSlots.java
index f78beb130e5..c55ed5957ba 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/FillUpMissingSlots.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/FillUpMissingSlots.java
@@ -18,6 +18,7 @@
 package org.apache.doris.nereids.rules.analysis;
 
 import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.analyzer.Scope;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.properties.OrderKey;
 import org.apache.doris.nereids.rules.Rule;
@@ -52,6 +53,8 @@ import java.util.stream.Collectors;
  * Resolve having clause to the aggregation/repeat.
  * need Top to Down to traverse plan,
  * because we need to process FILL_UP_SORT_HAVING_AGGREGATE before 
FILL_UP_HAVING_AGGREGATE.
+ * be aware that when filling up the missing slots, we should exclude outer 
query's correlated slots.
+ * because these correlated slots belong to outer query, so should not try to 
find them in child node.
  */
 public class FillUpMissingSlots implements AnalysisRuleFactory {
     @Override
@@ -59,14 +62,18 @@ public class FillUpMissingSlots implements 
AnalysisRuleFactory {
         return ImmutableList.of(
             RuleType.FILL_UP_SORT_PROJECT.build(
                 logicalSort(logicalProject())
-                    .then(sort -> {
+                    .thenApply(ctx -> {
+                        LogicalSort<LogicalProject<Plan>> sort = ctx.root;
+                        Optional<Scope> outerScope = 
ctx.cascadesContext.getOuterScope();
                         LogicalProject<Plan> project = sort.child();
                         Set<Slot> projectOutputSet = project.getOutputSet();
                         Set<Slot> notExistedInProject = 
sort.getOrderKeys().stream()
                                 .map(OrderKey::getExpr)
                                 .map(Expression::getInputSlots)
                                 .flatMap(Set::stream)
-                                .filter(s -> !projectOutputSet.contains(s))
+                                .filter(s -> !projectOutputSet.contains(s)
+                                        && (!outerScope.isPresent() || 
!outerScope.get()
+                                                
.getCorrelatedSlots().contains(s)))
                                 .collect(Collectors.toSet());
                         if (notExistedInProject.isEmpty()) {
                             return null;
@@ -82,7 +89,9 @@ public class FillUpMissingSlots implements 
AnalysisRuleFactory {
                     aggregate(logicalHaving(aggregate()))
                         .when(a -> 
a.getOutputExpressions().stream().allMatch(SlotReference.class::isInstance))
                 ).when(this::checkSort)
-                    .then(sort -> processDistinctProjectWithAggregate(sort, 
sort.child(), sort.child().child().child()))
+                    .thenApply(ctx -> 
processDistinctProjectWithAggregate(ctx.root,
+                            ctx.root.child(), ctx.root.child().child().child(),
+                            ctx.cascadesContext.getOuterScope()))
             ),
             // ATTN: process aggregate with distinct project, must run this 
rule before FILL_UP_SORT_AGGREGATE
             //   because this pattern will always fail in 
FILL_UP_SORT_AGGREGATE
@@ -91,14 +100,17 @@ public class FillUpMissingSlots implements 
AnalysisRuleFactory {
                     aggregate(aggregate())
                         .when(a -> 
a.getOutputExpressions().stream().allMatch(SlotReference.class::isInstance))
                 ).when(this::checkSort)
-                    .then(sort -> processDistinctProjectWithAggregate(sort, 
sort.child(), sort.child().child()))
+                    .thenApply(ctx -> 
processDistinctProjectWithAggregate(ctx.root,
+                            ctx.root.child(), ctx.root.child().child(),
+                            ctx.cascadesContext.getOuterScope()))
             ),
             RuleType.FILL_UP_SORT_AGGREGATE.build(
                 logicalSort(aggregate())
                     .when(this::checkSort)
-                    .then(sort -> {
+                    .thenApply(ctx -> {
+                        LogicalSort<Aggregate<Plan>> sort = ctx.root;
                         Aggregate<Plan> agg = sort.child();
-                        Resolver resolver = new Resolver(agg);
+                        Resolver resolver = new Resolver(agg, 
ctx.cascadesContext.getOuterScope());
                         sort.getExpressions().forEach(resolver::resolve);
                         return createPlan(resolver, agg, (r, a) -> {
                             List<OrderKey> newOrderKeys = 
sort.getOrderKeys().stream()
@@ -118,10 +130,11 @@ public class FillUpMissingSlots implements 
AnalysisRuleFactory {
             RuleType.FILL_UP_SORT_HAVING_AGGREGATE.build(
                 logicalSort(logicalHaving(aggregate()))
                     .when(this::checkSort)
-                    .then(sort -> {
+                    .thenApply(ctx -> {
+                        LogicalSort<LogicalHaving<Aggregate<Plan>>> sort = 
ctx.root;
                         LogicalHaving<Aggregate<Plan>> having = sort.child();
                         Aggregate<Plan> agg = having.child();
-                        Resolver resolver = new Resolver(agg);
+                        Resolver resolver = new Resolver(agg, 
ctx.cascadesContext.getOuterScope());
                         sort.getExpressions().forEach(resolver::resolve);
                         return createPlan(resolver, agg, (r, a) -> {
                             List<OrderKey> newOrderKeys = 
sort.getOrderKeys().stream()
@@ -138,13 +151,17 @@ public class FillUpMissingSlots implements 
AnalysisRuleFactory {
                     })
             ),
             RuleType.FILL_UP_SORT_HAVING_PROJECT.build(
-                    logicalSort(logicalHaving(logicalProject())).then(sort -> {
+                    logicalSort(logicalHaving(logicalProject())).thenApply(ctx 
-> {
+                        LogicalSort<LogicalHaving<LogicalProject<Plan>>> sort 
= ctx.root;
+                        Optional<Scope> outerScope = 
ctx.cascadesContext.getOuterScope();
                         Set<Slot> childOutput = sort.child().getOutputSet();
                         Set<Slot> notExistedInProject = 
sort.getOrderKeys().stream()
                                 .map(OrderKey::getExpr)
                                 .map(Expression::getInputSlots)
                                 .flatMap(Set::stream)
-                                .filter(s -> !childOutput.contains(s))
+                                .filter(s -> !childOutput.contains(s)
+                                        && (!outerScope.isPresent() || 
!outerScope.get()
+                                                
.getCorrelatedSlots().contains(s)))
                                 .collect(Collectors.toSet());
                         if (notExistedInProject.isEmpty()) {
                             return null;
@@ -158,9 +175,10 @@ public class FillUpMissingSlots implements 
AnalysisRuleFactory {
                     })
             ),
             RuleType.FILL_UP_HAVING_AGGREGATE.build(
-                logicalHaving(aggregate()).then(having -> {
+                logicalHaving(aggregate()).thenApply(ctx -> {
+                    LogicalHaving<Aggregate<Plan>> having = ctx.root;
                     Aggregate<Plan> agg = having.child();
-                    Resolver resolver = new Resolver(agg);
+                    Resolver resolver = new Resolver(agg, 
ctx.cascadesContext.getOuterScope());
                     having.getConjuncts().forEach(resolver::resolve);
                     return createPlan(resolver, agg, (r, a) -> {
                         Set<Expression> newConjuncts = ExpressionUtils.replace(
@@ -175,7 +193,9 @@ public class FillUpMissingSlots implements 
AnalysisRuleFactory {
             ),
             // Convert having to filter
             RuleType.FILL_UP_HAVING_PROJECT.build(
-                    logicalHaving(logicalProject()).then(having -> {
+                    logicalHaving(logicalProject()).thenApply(ctx -> {
+                        LogicalHaving<LogicalProject<Plan>> having = ctx.root;
+                        Optional<Scope> outerScope = 
ctx.cascadesContext.getOuterScope();
                         if (having.getExpressions().stream().anyMatch(e -> 
e.containsType(AggregateFunction.class))) {
                             // This is very weird pattern.
                             // There are some aggregate functions in having, 
but its child is project.
@@ -198,7 +218,7 @@ public class FillUpMissingSlots implements 
AnalysisRuleFactory {
                                     ImmutableList.of(), ImmutableList.of(), 
project.child());
                             // avoid throw exception even if having have slot 
from its child.
                             // because we will add a project between having 
and project.
-                            Resolver resolver = new Resolver(agg, false);
+                            Resolver resolver = new Resolver(agg, false, 
outerScope);
                             having.getConjuncts().forEach(resolver::resolve);
                             agg = 
agg.withAggOutput(resolver.getNewOutputSlots());
                             Set<Expression> newConjuncts = 
ExpressionUtils.replace(
@@ -212,7 +232,9 @@ public class FillUpMissingSlots implements 
AnalysisRuleFactory {
                             Set<Slot> notExistedInProject = 
having.getExpressions().stream()
                                     .map(Expression::getInputSlots)
                                     .flatMap(Set::stream)
-                                    .filter(s -> !projectOutputSet.contains(s))
+                                    .filter(s -> !projectOutputSet.contains(s)
+                                            && (!outerScope.isPresent() || 
!outerScope.get()
+                                                    
.getCorrelatedSlots().contains(s)))
                                     .collect(Collectors.toSet());
                             if (notExistedInProject.isEmpty()) {
                                 return null;
@@ -235,18 +257,28 @@ public class FillUpMissingSlots implements 
AnalysisRuleFactory {
         private final List<NamedExpression> newOutputSlots = 
Lists.newArrayList();
         private final Map<Slot, Expression> outputSubstitutionMap;
         private final boolean checkSlot;
+        private final Optional<Scope> outerScope;
 
-        Resolver(Aggregate<?> aggregate, boolean checkSlot) {
+        Resolver(Aggregate<?> aggregate, boolean checkSlot, Optional<Scope> 
outerScope) {
             outputExpressions = aggregate.getOutputExpressions();
             groupByExpressions = aggregate.getGroupByExpressions();
             outputSubstitutionMap = 
outputExpressions.stream().filter(Alias.class::isInstance)
                     .collect(Collectors.toMap(NamedExpression::toSlot, alias 
-> alias.child(0),
                             (k1, k2) -> k1));
             this.checkSlot = checkSlot;
+            this.outerScope = outerScope;
+        }
+
+        Resolver(Aggregate<?> aggregate, boolean checkSlot) {
+            this(aggregate, checkSlot, Optional.empty());
         }
 
         Resolver(Aggregate<?> aggregate) {
-            this(aggregate, true);
+            this(aggregate, true, Optional.empty());
+        }
+
+        Resolver(Aggregate<?> aggregate, Optional<Scope> outerScope) {
+            this(aggregate, true, outerScope);
         }
 
         public void resolve(Expression expression) {
@@ -274,7 +306,8 @@ public class FillUpMissingSlots implements 
AnalysisRuleFactory {
                 // We couldn't find the equivalent expression in output 
expressions and group-by expressions,
                 // so we should check whether the expression is valid.
                 if (expression instanceof SlotReference) {
-                    if (checkSlot) {
+                    if (checkSlot && (!outerScope.isPresent()
+                            || 
!outerScope.get().getCorrelatedSlots().contains(expression))) {
                         throw new AnalysisException(expression.toSql() + " 
should be grouped by.");
                     }
                 } else if (expression instanceof AggregateFunction) {
@@ -401,8 +434,8 @@ public class FillUpMissingSlots implements 
AnalysisRuleFactory {
      * @return filled up plan
      */
     private Plan processDistinctProjectWithAggregate(LogicalSort<?> sort,
-            Aggregate<?> upperAggregate, Aggregate<Plan> bottomAggregate) {
-        Resolver resolver = new Resolver(bottomAggregate);
+            Aggregate<?> upperAggregate, Aggregate<Plan> bottomAggregate, 
Optional<Scope> outerScope) {
+        Resolver resolver = new Resolver(bottomAggregate, outerScope);
         sort.getExpressions().forEach(resolver::resolve);
         return createPlan(resolver, bottomAggregate, (r, a) -> {
             List<OrderKey> newOrderKeys = sort.getOrderKeys().stream()
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/SubExprAnalyzer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/SubExprAnalyzer.java
index 7bfd5256f6a..7b0ed457082 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/SubExprAnalyzer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/SubExprAnalyzer.java
@@ -32,21 +32,29 @@ import 
org.apache.doris.nereids.trees.expressions.SubqueryExpr;
 import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
 import 
org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter;
 import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
 import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.ExpressionUtils;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 
 /**
  * Use the visitor to iterate sub expression.
@@ -114,22 +122,52 @@ class SubExprAnalyzer<T> extends 
DefaultExpressionRewriter<T> {
     @Override
     public Expression visitScalarSubquery(ScalarSubquery scalar, T context) {
         AnalyzedResult analyzedResult = analyzeSubquery(scalar);
+        boolean isCorrelated = analyzedResult.isCorrelated();
+        LogicalPlan analyzedSubqueryPlan = analyzedResult.logicalPlan;
+        checkOutputColumn(analyzedSubqueryPlan);
+        if (isCorrelated) {
+            if (analyzedSubqueryPlan instanceof LogicalLimit) {
+                LogicalLimit limit = (LogicalLimit) analyzedSubqueryPlan;
+                if (limit.getOffset() == 0 && limit.getLimit() == 1) {
+                    // skip useless limit node
+                    analyzedResult = new AnalyzedResult((LogicalPlan) 
analyzedSubqueryPlan.child(0),
+                            analyzedResult.correlatedSlots);
+                } else {
+                    throw new AnalysisException("limit is not supported in 
correlated subquery "
+                            + analyzedResult.getLogicalPlan());
+                }
+            }
+            if (analyzedSubqueryPlan instanceof LogicalSort) {
+                // skip useless sort node
+                analyzedResult = new AnalyzedResult((LogicalPlan) 
analyzedSubqueryPlan.child(0),
+                        analyzedResult.correlatedSlots);
+            }
+            CorrelatedSlotsValidator validator =
+                    new 
CorrelatedSlotsValidator(ImmutableSet.copyOf(analyzedResult.correlatedSlots));
+            List<PlanNodeCorrelatedInfo> nodeInfoList = new ArrayList<>(16);
+            Set<LogicalAggregate> topAgg = new HashSet<>();
+            validateSubquery(analyzedResult.logicalPlan, validator, 
nodeInfoList, topAgg);
+        }
 
-        checkOutputColumn(analyzedResult.getLogicalPlan());
-        checkHasAgg(analyzedResult);
-        checkHasNoGroupBy(analyzedResult);
-
-        // if scalar subquery is like select '2024-02-02 00:00:00'
-        // we can just return the constant expr '2024-02-02 00:00:00'
         if (analyzedResult.getLogicalPlan() instanceof LogicalProject) {
             LogicalProject project = (LogicalProject) 
analyzedResult.getLogicalPlan();
             if (project.child() instanceof LogicalOneRowRelation
                     && project.getProjects().size() == 1
                     && project.getProjects().get(0) instanceof Alias) {
+                // if scalar subquery is like select '2024-02-02 00:00:00'
+                // we can just return the constant expr '2024-02-02 00:00:00'
                 Alias alias = (Alias) project.getProjects().get(0);
                 if (alias.isConstant()) {
                     return alias.child();
                 }
+            } else if (isCorrelated) {
+                Set<Slot> correlatedSlots = new 
HashSet<>(analyzedResult.getCorrelatedSlots());
+                if 
(!Sets.intersection(ExpressionUtils.getInputSlotSet(project.getProjects()),
+                        correlatedSlots).isEmpty()) {
+                    throw new AnalysisException(
+                            "outer query's column is not supported in 
subquery's output "
+                                    + analyzedResult.getLogicalPlan());
+                }
             }
         }
 
@@ -143,27 +181,6 @@ class SubExprAnalyzer<T> extends 
DefaultExpressionRewriter<T> {
         }
     }
 
-    private void checkHasAgg(AnalyzedResult analyzedResult) {
-        if (!analyzedResult.isCorrelated()) {
-            return;
-        }
-        if (!analyzedResult.hasAgg()) {
-            throw new AnalysisException("The select item in correlated 
subquery of binary predicate "
-                    + "should only be sum, min, max, avg and count. Current 
subquery: "
-                    + analyzedResult.getLogicalPlan());
-        }
-    }
-
-    private void checkHasNoGroupBy(AnalyzedResult analyzedResult) {
-        if (!analyzedResult.isCorrelated()) {
-            return;
-        }
-        if (analyzedResult.hasGroupBy()) {
-            throw new AnalysisException("Unsupported correlated subquery with 
grouping and/or aggregation "
-                    + analyzedResult.getLogicalPlan());
-        }
-    }
-
     private void checkNoCorrelatedSlotsUnderAgg(AnalyzedResult analyzedResult) 
{
         if (analyzedResult.hasCorrelatedSlotsUnderAgg()) {
             throw new AnalysisException(
@@ -230,30 +247,19 @@ class SubExprAnalyzer<T> extends 
DefaultExpressionRewriter<T> {
             return !correlatedSlots.isEmpty();
         }
 
-        public boolean hasAgg() {
-            return logicalPlan.anyMatch(LogicalAggregate.class::isInstance);
-        }
-
-        public boolean hasGroupBy() {
-            if (hasAgg()) {
-                return !((LogicalAggregate)
-                        ((ImmutableSet) 
logicalPlan.collect(LogicalAggregate.class::isInstance)).asList().get(0))
-                        .getGroupByExpressions().isEmpty();
-            }
-            return false;
-        }
-
         public boolean hasCorrelatedSlotsUnderAgg() {
             return correlatedSlots.isEmpty() ? false
-                    : findAggContainsCorrelatedSlots(logicalPlan, 
ImmutableSet.copyOf(correlatedSlots));
+                    : hasCorrelatedSlotsUnderNode(logicalPlan,
+                            ImmutableSet.copyOf(correlatedSlots), 
LogicalAggregate.class);
         }
 
-        private boolean findAggContainsCorrelatedSlots(Plan rootPlan, 
ImmutableSet<Slot> slots) {
+        private static <T> boolean hasCorrelatedSlotsUnderNode(Plan rootPlan,
+                                                               
ImmutableSet<Slot> slots, Class<T> clazz) {
             ArrayDeque<Plan> planQueue = new ArrayDeque<>();
             planQueue.add(rootPlan);
             while (!planQueue.isEmpty()) {
                 Plan plan = planQueue.poll();
-                if (plan instanceof LogicalAggregate) {
+                if (plan.getClass().equals(clazz)) {
                     if (plan.containsSlots(slots)) {
                         return true;
                     }
@@ -278,4 +284,171 @@ class SubExprAnalyzer<T> extends 
DefaultExpressionRewriter<T> {
             return logicalPlan instanceof LogicalLimit && ((LogicalLimit<?>) 
logicalPlan).getLimit() == 0;
         }
     }
+
+    private static class PlanNodeCorrelatedInfo {
+        private PlanType planType;
+        private boolean containCorrelatedSlots;
+        private boolean hasGroupBy;
+        private LogicalAggregate aggregate;
+
+        public PlanNodeCorrelatedInfo(PlanType planType, boolean 
containCorrelatedSlots) {
+            this(planType, containCorrelatedSlots, null);
+        }
+
+        public PlanNodeCorrelatedInfo(PlanType planType, boolean 
containCorrelatedSlots,
+                LogicalAggregate aggregate) {
+            this.planType = planType;
+            this.containCorrelatedSlots = containCorrelatedSlots;
+            this.aggregate = aggregate;
+            this.hasGroupBy = aggregate != null ? 
!aggregate.getGroupByExpressions().isEmpty() : false;
+        }
+    }
+
+    private static class CorrelatedSlotsValidator
+            extends PlanVisitor<PlanNodeCorrelatedInfo, Void> {
+        private final ImmutableSet<Slot> correlatedSlots;
+
+        public CorrelatedSlotsValidator(ImmutableSet<Slot> correlatedSlots) {
+            this.correlatedSlots = correlatedSlots;
+        }
+
+        @Override
+        public PlanNodeCorrelatedInfo visit(Plan plan, Void context) {
+            return new PlanNodeCorrelatedInfo(plan.getType(), 
findCorrelatedSlots(plan));
+        }
+
+        public PlanNodeCorrelatedInfo visitLogicalProject(LogicalProject plan, 
Void context) {
+            boolean containCorrelatedSlots = findCorrelatedSlots(plan);
+            if (containCorrelatedSlots) {
+                throw new AnalysisException(
+                        String.format("access outer query's column in project 
is not supported",
+                                correlatedSlots));
+            } else {
+                PlanType planType = ExpressionUtils.containsWindowExpression(
+                        ((LogicalProject<?>) plan).getProjects()) ? 
PlanType.LOGICAL_WINDOW : plan.getType();
+                return new PlanNodeCorrelatedInfo(planType, false);
+            }
+        }
+
+        public PlanNodeCorrelatedInfo visitLogicalAggregate(LogicalAggregate 
plan, Void context) {
+            boolean containCorrelatedSlots = findCorrelatedSlots(plan);
+            if (containCorrelatedSlots) {
+                throw new AnalysisException(
+                        String.format("access outer query's column in 
aggregate is not supported",
+                                correlatedSlots, plan));
+            } else {
+                return new PlanNodeCorrelatedInfo(plan.getType(), false, plan);
+            }
+        }
+
+        public PlanNodeCorrelatedInfo visitLogicalJoin(LogicalJoin plan, Void 
context) {
+            boolean containCorrelatedSlots = findCorrelatedSlots(plan);
+            if (containCorrelatedSlots) {
+                throw new AnalysisException(
+                        String.format("access outer query's column in join is 
not supported",
+                                correlatedSlots, plan));
+            } else {
+                return new PlanNodeCorrelatedInfo(plan.getType(), false);
+            }
+        }
+
+        public PlanNodeCorrelatedInfo visitLogicalSort(LogicalSort plan, Void 
context) {
+            boolean containCorrelatedSlots = findCorrelatedSlots(plan);
+            if (containCorrelatedSlots) {
+                throw new AnalysisException(
+                        String.format("access outer query's column in order by 
is not supported",
+                                correlatedSlots, plan));
+            } else {
+                return new PlanNodeCorrelatedInfo(plan.getType(), false);
+            }
+        }
+
+        private boolean findCorrelatedSlots(Plan plan) {
+            return plan.getExpressions().stream().anyMatch(expression -> !Sets
+                    .intersection(correlatedSlots, 
expression.getInputSlots()).isEmpty());
+        }
+    }
+
+    private LogicalAggregate validateNodeInfoList(List<PlanNodeCorrelatedInfo> 
nodeInfoList) {
+        LogicalAggregate topAggregate = null;
+        int size = nodeInfoList.size();
+        if (size > 0) {
+            List<PlanNodeCorrelatedInfo> correlatedNodes = new ArrayList<>(4);
+            boolean checkNodeTypeAfterCorrelatedNode = false;
+            boolean checkAfterAggNode = false;
+            for (int i = size - 1; i >= 0; --i) {
+                PlanNodeCorrelatedInfo nodeInfo = nodeInfoList.get(i);
+                if (checkNodeTypeAfterCorrelatedNode) {
+                    switch (nodeInfo.planType) {
+                        case LOGICAL_LIMIT:
+                            throw new AnalysisException(
+                                    "limit is not supported in correlated 
subquery");
+                        case LOGICAL_GENERATE:
+                            throw new AnalysisException(
+                                    "access outer query's column before 
lateral view is not supported");
+                        case LOGICAL_AGGREGATE:
+                            if (checkAfterAggNode) {
+                                throw new AnalysisException(
+                                        "access outer query's column before 
two agg nodes is not supported");
+                            }
+                            if (nodeInfo.hasGroupBy) {
+                                // TODO support later
+                                throw new AnalysisException(
+                                        "access outer query's column before 
agg with group by is not supported");
+                            }
+                            checkAfterAggNode = true;
+                            topAggregate = nodeInfo.aggregate;
+                            break;
+                        case LOGICAL_WINDOW:
+                            throw new AnalysisException(
+                                    "access outer query's column before window 
function is not supported");
+                        case LOGICAL_JOIN:
+                            throw new AnalysisException(
+                                    "access outer query's column before join 
is not supported");
+                        case LOGICAL_SORT:
+                            // allow any sort node, the sort node will be 
removed by ELIMINATE_ORDER_BY_UNDER_SUBQUERY
+                            break;
+                        case LOGICAL_PROJECT:
+                            // allow any project node
+                            break;
+                        case LOGICAL_SUBQUERY_ALIAS:
+                            // allow any subquery alias
+                            break;
+                        default:
+                            if (checkAfterAggNode) {
+                                throw new AnalysisException(
+                                        "only project, sort and subquery alias 
node is allowed after agg node");
+                            }
+                            break;
+                    }
+                }
+                if (nodeInfo.containCorrelatedSlots) {
+                    correlatedNodes.add(nodeInfo);
+                    checkNodeTypeAfterCorrelatedNode = true;
+                }
+            }
+
+            // only support 1 correlated node for now
+            if (correlatedNodes.size() > 1) {
+                throw new AnalysisException(
+                        "access outer query's column in two places is not 
supported");
+            }
+        }
+        return topAggregate;
+    }
+
+    private void validateSubquery(Plan plan, CorrelatedSlotsValidator 
validator,
+            List<PlanNodeCorrelatedInfo> nodeInfoList, Set<LogicalAggregate> 
topAgg) {
+        nodeInfoList.add(plan.accept(validator, null));
+        for (Plan child : plan.children()) {
+            validateSubquery(child, validator, nodeInfoList, topAgg);
+        }
+        if (plan.children().isEmpty()) {
+            LogicalAggregate topAggNode = validateNodeInfoList(nodeInfoList);
+            if (topAggNode != null) {
+                topAgg.add(topAggNode);
+            }
+        }
+        nodeInfoList.remove(nodeInfoList.size() - 1);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/SubqueryToApply.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/SubqueryToApply.java
index cfc5b2ba24a..17e7d098cad 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/SubqueryToApply.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/SubqueryToApply.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.nereids.rules.analysis;
 
+import org.apache.doris.common.Pair;
 import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.rules.Rule;
@@ -30,6 +31,8 @@ import 
org.apache.doris.nereids.trees.expressions.BinaryOperator;
 import org.apache.doris.nereids.trees.expressions.Exists;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.InSubquery;
+import org.apache.doris.nereids.trees.expressions.IsNull;
+import org.apache.doris.nereids.trees.expressions.LessThanEqual;
 import org.apache.doris.nereids.trees.expressions.ListQuery;
 import org.apache.doris.nereids.trees.expressions.MarkJoinSlotReference;
 import org.apache.doris.nereids.trees.expressions.NamedExpression;
@@ -39,8 +42,14 @@ import 
org.apache.doris.nereids.trees.expressions.ScalarSubquery;
 import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.expressions.SlotReference;
 import org.apache.doris.nereids.trees.expressions.SubqueryExpr;
+import org.apache.doris.nereids.trees.expressions.functions.AlwaysNotNullable;
+import org.apache.doris.nereids.trees.expressions.functions.agg.AnyValue;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
+import org.apache.doris.nereids.trees.expressions.functions.scalar.AssertTrue;
 import org.apache.doris.nereids.trees.expressions.functions.scalar.Nvl;
 import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
 import 
org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
@@ -54,10 +63,12 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalSort;
 import org.apache.doris.nereids.util.ExpressionUtils;
 import org.apache.doris.nereids.util.Utils;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -124,14 +135,14 @@ public class SubqueryToApply implements 
AnalysisRuleFactory {
                                                 
TrySimplifyPredicateWithMarkJoinSlot.INSTANCE.rewrite(conjunct,
                                                         rewriteContext), 
rewriteContext)
                                         : false;
-
-                        applyPlan = subqueryToApply(subqueryExprs.stream()
+                        Pair<LogicalPlan, Optional<Expression>> result = 
subqueryToApply(subqueryExprs.stream()
                                     .collect(ImmutableList.toImmutableList()), 
tmpPlan,
                                 context.getSubqueryToMarkJoinSlot(),
                                 ctx.cascadesContext,
                                 Optional.of(conjunct), false, 
isMarkSlotNotNull);
+                        applyPlan = result.first;
                         tmpPlan = applyPlan;
-                        newConjuncts.add(conjunct);
+                        newConjuncts.add(result.second.isPresent() ? 
result.second.get() : conjunct);
                     }
                     Plan newFilter = new LogicalFilter<>(newConjuncts.build(), 
applyPlan);
                     return new 
LogicalProject<>(filter.getOutput().stream().collect(ImmutableList.toImmutableList()),
@@ -167,13 +178,15 @@ public class SubqueryToApply implements 
AnalysisRuleFactory {
                     Expression newProject =
                             replaceSubquery.replace(oldProjects.get(i), 
context);
 
-                    applyPlan = subqueryToApply(
-                            Utils.fastToImmutableList(subqueryExprs),
-                            childPlan, context.getSubqueryToMarkJoinSlot(),
-                            ctx.cascadesContext,
-                            Optional.of(newProject), true, false);
+                    Pair<LogicalPlan, Optional<Expression>> result =
+                            
subqueryToApply(Utils.fastToImmutableList(subqueryExprs), childPlan,
+                                    context.getSubqueryToMarkJoinSlot(), 
ctx.cascadesContext,
+                                    Optional.of(newProject), true, false);
+                    applyPlan = result.first;
                     childPlan = applyPlan;
-                    newProjects.add((NamedExpression) newProject);
+                    newProjects.add(
+                            result.second.isPresent() ? (NamedExpression) 
result.second.get()
+                                    : (NamedExpression) newProject);
                 }
 
                 return project.withProjectsAndChild(newProjects.build(), 
childPlan);
@@ -248,17 +261,18 @@ public class SubqueryToApply implements 
AnalysisRuleFactory {
                                     
TrySimplifyPredicateWithMarkJoinSlot.INSTANCE.rewrite(conjunct, rewriteContext),
                                     rewriteContext)
                                 : false;
-                        applyPlan = subqueryToApply(
+                        Pair<LogicalPlan, Optional<Expression>> result = 
subqueryToApply(
                                 
subqueryExprs.stream().collect(ImmutableList.toImmutableList()),
                                 relatedInfoList.get(i) == 
RelatedInfo.RelatedToLeft ? leftChildPlan : rightChildPlan,
                                 context.getSubqueryToMarkJoinSlot(),
                                 ctx.cascadesContext, Optional.of(conjunct), 
false, isMarkSlotNotNull);
+                        applyPlan = result.first;
                         if (relatedInfoList.get(i) == 
RelatedInfo.RelatedToLeft) {
                             leftChildPlan = applyPlan;
                         } else {
                             rightChildPlan = applyPlan;
                         }
-                        newConjuncts.add(conjunct);
+                        newConjuncts.add(result.second.isPresent() ? 
result.second.get() : conjunct);
                     }
                     List<Expression> simpleConjuncts = 
joinConjuncts.get(false);
                     if (simpleConjuncts != null) {
@@ -350,12 +364,12 @@ public class SubqueryToApply implements 
AnalysisRuleFactory {
         return correlatedInfoList.build();
     }
 
-    private LogicalPlan subqueryToApply(List<SubqueryExpr> subqueryExprs, 
LogicalPlan childPlan,
-                                        Map<SubqueryExpr, 
Optional<MarkJoinSlotReference>> subqueryToMarkJoinSlot,
-                                        CascadesContext ctx,
-                                        Optional<Expression> conjunct, boolean 
isProject,
-                                        boolean isMarkJoinSlotNotNull) {
-        LogicalPlan tmpPlan = childPlan;
+    private Pair<LogicalPlan, Optional<Expression>> subqueryToApply(
+            List<SubqueryExpr> subqueryExprs, LogicalPlan childPlan,
+            Map<SubqueryExpr, Optional<MarkJoinSlotReference>> 
subqueryToMarkJoinSlot,
+            CascadesContext ctx, Optional<Expression> conjunct, boolean 
isProject,
+            boolean isMarkJoinSlotNotNull) {
+        Pair<LogicalPlan, Optional<Expression>> tmpPlan = Pair.of(childPlan, 
conjunct);
         for (int i = 0; i < subqueryExprs.size(); ++i) {
             SubqueryExpr subqueryExpr = subqueryExprs.get(i);
             if (subqueryExpr instanceof Exists && 
hasTopLevelScalarAgg(subqueryExpr.getQueryPlan())) {
@@ -366,7 +380,7 @@ public class SubqueryToApply implements AnalysisRuleFactory 
{
             }
 
             if (!ctx.subqueryIsAnalyzed(subqueryExpr)) {
-                tmpPlan = addApply(subqueryExpr, tmpPlan,
+                tmpPlan = addApply(subqueryExpr, tmpPlan.first,
                     subqueryToMarkJoinSlot, ctx, conjunct,
                     isProject, subqueryExprs.size() == 1, 
isMarkJoinSlotNotNull);
             }
@@ -383,32 +397,108 @@ public class SubqueryToApply implements 
AnalysisRuleFactory {
         return false;
     }
 
-    private LogicalPlan addApply(SubqueryExpr subquery, LogicalPlan childPlan,
-                                 Map<SubqueryExpr, 
Optional<MarkJoinSlotReference>> subqueryToMarkJoinSlot,
-                                 CascadesContext ctx, Optional<Expression> 
conjunct,
-                                 boolean isProject, boolean singleSubquery, 
boolean isMarkJoinSlotNotNull) {
+    private Pair<LogicalPlan, Optional<Expression>> addApply(SubqueryExpr 
subquery,
+            LogicalPlan childPlan,
+            Map<SubqueryExpr, Optional<MarkJoinSlotReference>> 
subqueryToMarkJoinSlot,
+            CascadesContext ctx, Optional<Expression> conjunct, boolean 
isProject,
+            boolean singleSubquery, boolean isMarkJoinSlotNotNull) {
         ctx.setSubqueryExprIsAnalyzed(subquery, true);
+        Optional<MarkJoinSlotReference> markJoinSlot = 
subqueryToMarkJoinSlot.get(subquery);
         boolean needAddScalarSubqueryOutputToProjects = 
isConjunctContainsScalarSubqueryOutput(
                 subquery, conjunct, isProject, singleSubquery);
+        boolean needRuntimeAssertCount = false;
+        NamedExpression oldSubqueryOutput = 
subquery.getQueryPlan().getOutput().get(0);
+        Slot countSlot = null;
+        Slot anyValueSlot = null;
+        Optional<Expression> newConjunct = conjunct;
+        if (needAddScalarSubqueryOutputToProjects && subquery instanceof 
ScalarSubquery
+                && !subquery.getCorrelateSlots().isEmpty()) {
+            if (((ScalarSubquery) subquery).hasTopLevelScalarAgg()) {
+                // consider sql: SELECT * FROM t1 WHERE t1.a <= (SELECT 
COUNT(t2.a) FROM t2 WHERE (t1.b = t2.b));
+                // when unnest correlated subquery, we create a left join node.
+                // outer query is left table and subquery is right one
+                // if there is no match, the row from right table is filled 
with nulls
+                // but COUNT function is always not nullable.
+                // so wrap COUNT with Nvl to ensure its result is 0 instead of 
null to get the correct result
+                if (conjunct.isPresent()) {
+                    Map<Expression, Expression> replaceMap = new HashMap<>();
+                    NamedExpression agg = ((ScalarSubquery) 
subquery).getTopLevelScalarAggFunction().get();
+                    if (agg instanceof Alias) {
+                        if (((Alias) agg).child() instanceof 
AlwaysNotNullable) {
+                            AlwaysNotNullable notNullableAggFunc =
+                                    (AlwaysNotNullable) ((Alias) agg).child();
+                            if (subquery.getQueryPlan() instanceof 
LogicalProject) {
+                                LogicalProject logicalProject =
+                                        (LogicalProject) 
subquery.getQueryPlan();
+                                
Preconditions.checkState(logicalProject.getOutputs().size() == 1,
+                                        "Scalar subuqery's should only output 
1 column");
+                                Slot aggSlot = agg.toSlot();
+                                replaceMap.put(aggSlot, new Alias(new 
Nvl(aggSlot,
+                                        
notNullableAggFunc.resultForEmptyInput())));
+                                NamedExpression newOutput = (NamedExpression) 
ExpressionUtils
+                                        .replace((NamedExpression) 
logicalProject.getProjects().get(0), replaceMap);
+                                replaceMap.clear();
+                                replaceMap.put(oldSubqueryOutput, 
newOutput.toSlot());
+                                oldSubqueryOutput = newOutput;
+                                subquery = subquery.withSubquery((LogicalPlan) 
logicalProject.child());
+                            } else {
+                                replaceMap.put(oldSubqueryOutput, new 
Nvl(oldSubqueryOutput,
+                                        
notNullableAggFunc.resultForEmptyInput()));
+                            }
+                        }
+                        if (!replaceMap.isEmpty()) {
+                            newConjunct = 
Optional.of(ExpressionUtils.replace(conjunct.get(), replaceMap));
+                        }
+                    }
+                }
+            } else {
+                // if scalar subquery doesn't have top level scalar agg we 
will create one, for example
+                // select (select t2.c1 from t2 where t2.c2 = t1.c2) from t1;
+                // the original output of the correlate subquery is t2.c1, 
after adding a scalar agg, it will be
+                // select (select count(*), any_value(t2.c1) from t2 where 
t2.c2 = t1.c2) from t1;
+                Alias countAlias = new Alias(new Count());
+                Alias anyValueAlias = new Alias(new 
AnyValue(oldSubqueryOutput));
+                LogicalAggregate<Plan> aggregate = new 
LogicalAggregate<>(ImmutableList.of(),
+                        ImmutableList.of(countAlias, anyValueAlias), 
subquery.getQueryPlan());
+                countSlot = countAlias.toSlot();
+                anyValueSlot = anyValueAlias.toSlot();
+                subquery = subquery.withSubquery(aggregate);
+                if (conjunct.isPresent()) {
+                    Map<Expression, Expression> replaceMap = new HashMap<>();
+                    replaceMap.put(oldSubqueryOutput, anyValueSlot);
+                    newConjunct = 
Optional.of(ExpressionUtils.replace(conjunct.get(), replaceMap));
+                }
+                needRuntimeAssertCount = true;
+            }
+        }
         LogicalApply newApply = new LogicalApply(
                 subquery.getCorrelateSlots(),
                 subquery, Optional.empty(),
-                subqueryToMarkJoinSlot.get(subquery),
+                markJoinSlot,
                 needAddScalarSubqueryOutputToProjects, isProject, 
isMarkJoinSlotNotNull,
                 childPlan, subquery.getQueryPlan());
 
-        List<NamedExpression> projects = 
ImmutableList.<NamedExpression>builder()
-                    // left child
-                    .addAll(childPlan.getOutput())
-                    // markJoinSlotReference
-                    .addAll(subqueryToMarkJoinSlot.get(subquery).isPresent()
-                        ? 
ImmutableList.of(subqueryToMarkJoinSlot.get(subquery).get()) : 
ImmutableList.of())
-                    // scalarSubquery output
-                    .addAll(needAddScalarSubqueryOutputToProjects
-                        ? 
ImmutableList.of(subquery.getQueryPlan().getOutput().get(0)) : 
ImmutableList.of())
-                    .build();
-
-        return new LogicalProject(projects, newApply);
+        ImmutableList.Builder<NamedExpression> projects =
+                
ImmutableList.builderWithExpectedSize(childPlan.getOutput().size() + 3);
+        // left child
+        projects.addAll(childPlan.getOutput());
+        // markJoinSlotReference
+        markJoinSlot.map(projects::add);
+        if (needAddScalarSubqueryOutputToProjects) {
+            if (needRuntimeAssertCount) {
+                // if we create a new subquery in previous step, we need add 
the any_value() and assert_true()
+                // into the project list. So BE will use assert_true to check 
if the subquery return only 1 row
+                projects.add(anyValueSlot);
+                projects.add(new Alias(new AssertTrue(
+                        ExpressionUtils.or(new IsNull(countSlot),
+                                new LessThanEqual(countSlot, new 
IntegerLiteral(1))),
+                        new VarcharLiteral("correlate scalar subquery must 
return only 1 row"))));
+            } else {
+                projects.add(oldSubqueryOutput);
+            }
+        }
+
+        return Pair.of(new LogicalProject(projects.build(), newApply), 
newConjunct);
     }
 
     private boolean isConjunctContainsScalarSubqueryOutput(
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/FoldConstantRuleOnFE.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/FoldConstantRuleOnFE.java
index 4945123303c..b29694d5440 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/FoldConstantRuleOnFE.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/FoldConstantRuleOnFE.java
@@ -64,6 +64,7 @@ import 
org.apache.doris.nereids.trees.expressions.functions.scalar.Date;
 import 
org.apache.doris.nereids.trees.expressions.functions.scalar.EncryptKeyRef;
 import org.apache.doris.nereids.trees.expressions.functions.scalar.If;
 import org.apache.doris.nereids.trees.expressions.functions.scalar.LastQueryId;
+import org.apache.doris.nereids.trees.expressions.functions.scalar.Nvl;
 import org.apache.doris.nereids.trees.expressions.functions.scalar.Password;
 import org.apache.doris.nereids.trees.expressions.functions.scalar.SessionUser;
 import org.apache.doris.nereids.trees.expressions.functions.scalar.User;
@@ -171,7 +172,8 @@ public class FoldConstantRuleOnFE extends 
AbstractExpressionRewriteRule
                 matches(Date.class, this::visitDate),
                 matches(Version.class, this::visitVersion),
                 matches(SessionUser.class, this::visitSessionUser),
-                matches(LastQueryId.class, this::visitLastQueryId)
+                matches(LastQueryId.class, this::visitLastQueryId),
+                matches(Nvl.class, this::visitNvl)
         );
     }
 
@@ -644,6 +646,21 @@ public class FoldConstantRuleOnFE extends 
AbstractExpressionRewriteRule
         return new StringLiteral(GlobalVariable.version);
     }
 
+    @Override
+    public Expression visitNvl(Nvl nvl, ExpressionRewriteContext context) {
+        for (Expression expr : nvl.children()) {
+            if (expr.isLiteral()) {
+                if (!expr.isNullLiteral()) {
+                    return expr;
+                }
+            } else {
+                return nvl;
+            }
+        }
+        // all nulls
+        return nvl.child(0);
+    }
+
     private <E extends Expression> E rewriteChildren(E expr, 
ExpressionRewriteContext context) {
         if (!deepRewrite) {
             return expr;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PullUpCorrelatedFilterUnderApplyAggregateProject.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PullUpCorrelatedFilterUnderApplyAggregateProject.java
index 309bd9a78b9..c8cb9ebe8f5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PullUpCorrelatedFilterUnderApplyAggregateProject.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PullUpCorrelatedFilterUnderApplyAggregateProject.java
@@ -19,17 +19,22 @@ package org.apache.doris.nereids.rules.rewrite;
 
 import org.apache.doris.nereids.rules.Rule;
 import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
 import org.apache.doris.nereids.trees.plans.logical.LogicalApply;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
 import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.util.ExpressionUtils;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 import java.util.List;
+import java.util.Set;
 
 /**
  * Swap the order of project and filter under agg in correlated subqueries.
@@ -94,8 +99,22 @@ public class 
PullUpCorrelatedFilterUnderApplyAggregateProject implements Rewrite
             }
         });
 
-        LogicalProject<Plan> newProject = 
project.withProjectsAndChild(newProjects, filter.child());
-        LogicalFilter<Plan> newFilter = new 
LogicalFilter<>(filter.getConjuncts(), newProject);
+        Set<Slot> correlatedSlots = 
ExpressionUtils.getInputSlotSet(apply.getCorrelationSlot());
+        Set<Expression> pullUpPredicates = Sets.newLinkedHashSet();
+        Set<Expression> filterPredicates = Sets.newLinkedHashSet();
+        for (Expression conjunct : filter.getConjuncts()) {
+            Set<Slot> conjunctSlots = conjunct.getInputSlots();
+            if (Sets.intersection(conjunctSlots, correlatedSlots).isEmpty()) {
+                filterPredicates.add(conjunct);
+            } else {
+                pullUpPredicates.add(conjunct);
+            }
+        }
+
+        LogicalProject<Plan> newProject = 
project.withProjectsAndChild(newProjects,
+                filterPredicates.isEmpty() ? filter.child()
+                        : filter.withConjuncts(filterPredicates));
+        LogicalFilter<Plan> newFilter = new LogicalFilter<>(pullUpPredicates, 
newProject);
         LogicalAggregate<Plan> newAgg = 
agg.withChildren(ImmutableList.of(newFilter));
         return (LogicalApply<?, ?>) (apply.withChildren(apply.left(),
                 isRightChildAgg ? newAgg : 
apply.right().withChildren(newAgg)));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PullUpProjectUnderApply.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PullUpProjectUnderApply.java
index 79750d55f6f..b2398ee3b56 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PullUpProjectUnderApply.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PullUpProjectUnderApply.java
@@ -61,9 +61,13 @@ public class PullUpProjectUnderApply extends 
OneRewriteRuleFactory {
                     Plan newCorrelate = apply.withChildren(apply.left(), 
project.child());
                     List<NamedExpression> newProjects = new 
ArrayList<>(apply.left().getOutput());
                     if (apply.getSubqueryExpr() instanceof ScalarSubquery) {
-                        Preconditions.checkState(project.getProjects().size() 
== 1,
-                                "ScalarSubquery should only have one output 
column");
-                        newProjects.add(project.getProjects().get(0));
+                        // unnest correlated scalar subquery may add count(*) 
and any_value() to project list
+                        // the previous SubqueryToApply rule will make sure of 
it. So the output column
+                        // may be 1 or 2, we add a check here.
+                        int size = project.getProjects().size();
+                        Preconditions.checkState(size == 1 || size == 2,
+                                "ScalarSubquery should only have one or two 
output column");
+                        newProjects.addAll(project.getProjects());
                     }
                     if (apply.isMarkJoin()) {
                         
newProjects.add(apply.getMarkJoinSlotReference().get());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughProject.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughProject.java
index 5842beaf3d6..f6f7c2d1100 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughProject.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughProject.java
@@ -47,14 +47,14 @@ public class PushDownFilterThroughProject implements 
RewriteRuleFactory {
     @Override
     public List<Rule> buildRules() {
         return ImmutableList.of(
-                logicalFilter(logicalProject())
+                
logicalFilter(logicalProject().whenNot(LogicalProject::containsNoneMovableFunction))
                         .whenNot(filter -> 
ExpressionUtils.containsWindowExpression(filter.child().getProjects()))
                         
.then(PushDownFilterThroughProject::pushDownFilterThroughProject)
                         .toRule(RuleType.PUSH_DOWN_FILTER_THROUGH_PROJECT),
                 // filter(project(limit)) will change to 
filter(limit(project)) by PushdownProjectThroughLimit,
                 // then we should change filter(limit(project)) to 
project(filter(limit))
                 // TODO maybe we could remove this rule, because translator 
already support filter(limit(project))
-                logicalFilter(logicalLimit(logicalProject()))
+                
logicalFilter(logicalLimit(logicalProject().whenNot(LogicalProject::containsNoneMovableFunction)))
                         .whenNot(filter ->
                                 
ExpressionUtils.containsWindowExpression(filter.child().child().getProjects())
                         )
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/ScalarSubquery.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/ScalarSubquery.java
index 88b354ae821..178debe7db8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/ScalarSubquery.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/ScalarSubquery.java
@@ -19,10 +19,17 @@ package org.apache.doris.nereids.trees.expressions;
 
 import org.apache.doris.nereids.exceptions.UnboundException;
 import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
+import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias;
 import org.apache.doris.nereids.types.DataType;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 
 import java.util.List;
 import java.util.Objects;
@@ -33,20 +40,41 @@ import java.util.Optional;
  */
 public class ScalarSubquery extends SubqueryExpr {
 
+    private final boolean hasTopLevelScalarAgg;
+
     public ScalarSubquery(LogicalPlan subquery) {
-        super(Objects.requireNonNull(subquery, "subquery can not be null"));
+        this(subquery, ImmutableList.of());
     }
 
     public ScalarSubquery(LogicalPlan subquery, List<Slot> correlateSlots) {
-        this(Objects.requireNonNull(subquery, "subquery can not be null"),
-                Objects.requireNonNull(correlateSlots, "correlateSlots can not 
be null"),
-                Optional.empty());
+        this(subquery, correlateSlots, Optional.empty());
     }
 
     public ScalarSubquery(LogicalPlan subquery, List<Slot> correlateSlots, 
Optional<Expression> typeCoercionExpr) {
         super(Objects.requireNonNull(subquery, "subquery can not be null"),
                 Objects.requireNonNull(correlateSlots, "correlateSlots can not 
be null"),
                 typeCoercionExpr);
+        hasTopLevelScalarAgg = findTopLevelScalarAgg(subquery, 
ImmutableSet.copyOf(correlateSlots)) != null;
+    }
+
+    public boolean hasTopLevelScalarAgg() {
+        return hasTopLevelScalarAgg;
+    }
+
+    /**
+    * getTopLevelScalarAggFunction
+    */
+    public Optional<NamedExpression> getTopLevelScalarAggFunction() {
+        Plan plan = findTopLevelScalarAgg(queryPlan, 
ImmutableSet.copyOf(correlateSlots));
+        if (plan != null) {
+            LogicalAggregate aggregate = (LogicalAggregate) plan;
+            Preconditions.checkState(aggregate.getAggregateFunctions().size() 
== 1,
+                    "in scalar subquery, should only return 1 column 1 row, "
+                            + "but we found multiple columns ", 
aggregate.getOutputExpressions());
+            return Optional.of((NamedExpression) 
aggregate.getOutputExpressions().get(0));
+        } else {
+            return Optional.empty();
+        }
     }
 
     @Override
@@ -81,4 +109,30 @@ public class ScalarSubquery extends SubqueryExpr {
     public ScalarSubquery withSubquery(LogicalPlan subquery) {
         return new ScalarSubquery(subquery, correlateSlots, typeCoercionExpr);
     }
+
+    /**
+     * for correlated subquery, we define top level scalar agg as if it meets 
the both 2 conditions:
+     * 1. The agg or its child contains correlated slots
+     * 2. only project, sort and subquery alias node can be agg's parent
+     */
+    public static Plan findTopLevelScalarAgg(Plan plan, ImmutableSet<Slot> 
slots) {
+        if (plan instanceof LogicalAggregate) {
+            if (((LogicalAggregate<?>) plan).getGroupByExpressions().isEmpty() 
&& plan.containsSlots(slots)) {
+                return plan;
+            } else {
+                return null;
+            }
+        } else if (plan instanceof LogicalProject || plan instanceof 
LogicalSubQueryAlias
+                || plan instanceof LogicalSort) {
+            for (Plan child : plan.children()) {
+                Plan result = findTopLevelScalarAgg(child, slots);
+                if (result != null) {
+                    return result;
+                }
+            }
+            return null;
+        } else {
+            return null;
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/AlwaysNotNullable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/AlwaysNotNullable.java
index 8fda4d4b020..6b12f9cd642 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/AlwaysNotNullable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/AlwaysNotNullable.java
@@ -17,6 +17,9 @@
 
 package org.apache.doris.nereids.trees.expressions.functions;
 
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.trees.expressions.Expression;
+
 /**
  * nullable is always false.
  *
@@ -27,4 +30,10 @@ public interface AlwaysNotNullable extends ComputeNullable {
     default boolean nullable() {
         return false;
     }
+
+    // return value of this function if the input data is empty.
+    // for example, count(*) of empty table is 0;
+    default Expression resultForEmptyInput() {
+        throw new AnalysisException("should implement resultForEmptyInput() 
for " + this.getClass());
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/AlwaysNotNullable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/NoneMovableFunction.java
similarity index 79%
copy from 
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/AlwaysNotNullable.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/NoneMovableFunction.java
index 8fda4d4b020..46d5e65c921 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/AlwaysNotNullable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/NoneMovableFunction.java
@@ -18,13 +18,8 @@
 package org.apache.doris.nereids.trees.expressions.functions;
 
 /**
- * nullable is always false.
- *
- * e.g. `count(*)`, the output column is always not nullable
+ * FunctionTrait. Means shouldn't push filter through the project with 
NoneMovableFunction
+ * and should not prune any NoneMovableFunction
  */
-public interface AlwaysNotNullable extends ComputeNullable {
-    @Override
-    default boolean nullable() {
-        return false;
-    }
+public interface NoneMovableFunction {
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/ArrayAgg.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/ArrayAgg.java
index 7e85eafcd2a..bc91207e31f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/ArrayAgg.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/ArrayAgg.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.FunctionSignature;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.functions.AlwaysNotNullable;
 import 
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
+import org.apache.doris.nereids.trees.expressions.literal.ArrayLiteral;
 import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
 import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
 import org.apache.doris.nereids.types.ArrayType;
@@ -30,6 +31,7 @@ import 
org.apache.doris.nereids.types.coercion.FollowToAnyDataType;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -64,4 +66,9 @@ public class ArrayAgg extends AggregateFunction
     public List<FunctionSignature> getSignatures() {
         return SIGNATURES;
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        return new ArrayLiteral(new ArrayList<>(), this.getDataType());
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/BitmapAgg.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/BitmapAgg.java
index 1d32910e1a9..eaf766b908d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/BitmapAgg.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/BitmapAgg.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.FunctionSignature;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.functions.AlwaysNotNullable;
 import 
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
+import org.apache.doris.nereids.trees.expressions.functions.scalar.BitmapEmpty;
 import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
 import org.apache.doris.nereids.types.BigIntType;
 import org.apache.doris.nereids.types.BitmapType;
@@ -63,4 +64,9 @@ public class BitmapAgg extends AggregateFunction
         Preconditions.checkArgument(children.size() == 1);
         return new BitmapAgg(distinct, children.get(0));
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        return new BitmapEmpty();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/BitmapIntersect.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/BitmapIntersect.java
index 27d4d136dad..1b7d2d3c3cd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/BitmapIntersect.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/BitmapIntersect.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.FunctionSignature;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.functions.AlwaysNotNullable;
 import 
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
+import org.apache.doris.nereids.trees.expressions.functions.scalar.BitmapEmpty;
 import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
 import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
 import org.apache.doris.nereids.types.BitmapType;
@@ -78,4 +79,9 @@ public class BitmapIntersect extends AggregateFunction
     public List<FunctionSignature> getSignatures() {
         return SIGNATURES;
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        return new BitmapEmpty();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/BitmapUnion.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/BitmapUnion.java
index 3b3a37bb760..cd0756a1c93 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/BitmapUnion.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/BitmapUnion.java
@@ -22,6 +22,7 @@ import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.functions.AlwaysNotNullable;
 import 
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
 import org.apache.doris.nereids.trees.expressions.functions.Function;
+import org.apache.doris.nereids.trees.expressions.functions.scalar.BitmapEmpty;
 import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
 import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
 import org.apache.doris.nereids.types.BitmapType;
@@ -89,4 +90,9 @@ public class BitmapUnion extends AggregateFunction
     public boolean canRollUp() {
         return true;
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        return new BitmapEmpty();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/BitmapUnionCount.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/BitmapUnionCount.java
index 08772b06d57..593c814f22d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/BitmapUnionCount.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/BitmapUnionCount.java
@@ -22,6 +22,7 @@ import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.functions.AlwaysNotNullable;
 import 
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
 import org.apache.doris.nereids.trees.expressions.functions.Function;
+import org.apache.doris.nereids.trees.expressions.literal.BigIntLiteral;
 import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
 import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
 import org.apache.doris.nereids.types.BigIntType;
@@ -90,4 +91,10 @@ public class BitmapUnionCount extends AggregateFunction
     public boolean canRollUp() {
         return false;
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        return new BigIntLiteral(0);
+    }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/BitmapUnionInt.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/BitmapUnionInt.java
index edae2d187e4..2efe1631176 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/BitmapUnionInt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/BitmapUnionInt.java
@@ -22,6 +22,7 @@ import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.functions.AlwaysNotNullable;
 import 
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
+import org.apache.doris.nereids.trees.expressions.literal.BigIntLiteral;
 import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
 import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
 import org.apache.doris.nereids.types.BigIntType;
@@ -94,4 +95,9 @@ public class BitmapUnionInt extends AggregateFunction
     public List<FunctionSignature> getSignatures() {
         return SIGNATURES;
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        return new BigIntLiteral(0);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/CollectList.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/CollectList.java
index 470054aa894..d6cca2d0b90 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/CollectList.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/CollectList.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.FunctionSignature;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.functions.AlwaysNotNullable;
 import 
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
+import org.apache.doris.nereids.trees.expressions.literal.ArrayLiteral;
 import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
 import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
 import org.apache.doris.nereids.types.ArrayType;
@@ -31,6 +32,7 @@ import 
org.apache.doris.nereids.types.coercion.FollowToAnyDataType;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -95,4 +97,9 @@ public class CollectList extends AggregateFunction
     public List<FunctionSignature> getSignatures() {
         return SIGNATURES;
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        return new ArrayLiteral(new ArrayList<>(), this.getDataType());
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/CollectSet.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/CollectSet.java
index 5eeab663fd2..d9e7e7227c6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/CollectSet.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/CollectSet.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.FunctionSignature;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.functions.AlwaysNotNullable;
 import 
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
+import org.apache.doris.nereids.trees.expressions.literal.ArrayLiteral;
 import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
 import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
 import org.apache.doris.nereids.types.ArrayType;
@@ -31,6 +32,7 @@ import 
org.apache.doris.nereids.types.coercion.FollowToAnyDataType;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -101,4 +103,9 @@ public class CollectSet extends AggregateFunction
     public List<FunctionSignature> getSignatures() {
         return SIGNATURES;
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        return new ArrayLiteral(new ArrayList<>(), this.getDataType());
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Count.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Count.java
index 2bfcbe91b35..10874d47ee3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Count.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Count.java
@@ -24,6 +24,7 @@ import 
org.apache.doris.nereids.trees.expressions.functions.AlwaysNotNullable;
 import 
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
 import org.apache.doris.nereids.trees.expressions.functions.Function;
 import 
org.apache.doris.nereids.trees.expressions.functions.window.SupportWindowAnalytic;
+import org.apache.doris.nereids.trees.expressions.literal.BigIntLiteral;
 import org.apache.doris.nereids.trees.expressions.literal.Literal;
 import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
 import org.apache.doris.nereids.types.BigIntType;
@@ -157,4 +158,9 @@ public class Count extends AggregateFunction
     public boolean canRollUp() {
         return true;
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        return new BigIntLiteral(0);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/CountByEnum.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/CountByEnum.java
index 721471add66..2a4ee7be3f4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/CountByEnum.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/CountByEnum.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.FunctionSignature;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.functions.AlwaysNotNullable;
 import 
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
+import org.apache.doris.nereids.trees.expressions.literal.StringLiteral;
 import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
 import org.apache.doris.nereids.types.StringType;
 import org.apache.doris.nereids.util.ExpressionUtils;
@@ -62,4 +63,9 @@ public class CountByEnum extends AggregateFunction implements 
ExplicitlyCastable
     public List<FunctionSignature> getSignatures() {
         return SIGNATURES;
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        return new StringLiteral("[]");
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/GroupArrayIntersect.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/GroupArrayIntersect.java
index 3d6216d0d09..0720d6838bb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/GroupArrayIntersect.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/GroupArrayIntersect.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.FunctionSignature;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.functions.AlwaysNotNullable;
 import 
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
+import org.apache.doris.nereids.trees.expressions.literal.ArrayLiteral;
 import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
 import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
 import org.apache.doris.nereids.types.ArrayType;
@@ -29,6 +30,7 @@ import org.apache.doris.nereids.types.coercion.AnyDataType;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -73,4 +75,9 @@ public class GroupArrayIntersect extends AggregateFunction
     public List<FunctionSignature> getSignatures() {
         return SIGNATURES;
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        return new ArrayLiteral(new ArrayList<>(), this.getDataType());
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Histogram.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Histogram.java
index 1f0c2d60f15..6b0a2759823 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Histogram.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Histogram.java
@@ -22,6 +22,7 @@ import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.functions.AlwaysNotNullable;
 import 
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
 import org.apache.doris.nereids.trees.expressions.functions.SearchSignature;
+import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
 import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
 import org.apache.doris.nereids.types.DoubleType;
 import org.apache.doris.nereids.types.IntegerType;
@@ -112,4 +113,9 @@ public class Histogram extends AggregateFunction
     public List<FunctionSignature> getSignatures() {
         return SIGNATURES;
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        return new VarcharLiteral("{\"num_buckets\":0,\"buckets\":[]}");
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/HllUnion.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/HllUnion.java
index 7f98d1b6c0d..b81fad270b0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/HllUnion.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/HllUnion.java
@@ -22,6 +22,7 @@ import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.functions.AlwaysNotNullable;
 import 
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
 import org.apache.doris.nereids.trees.expressions.functions.Function;
+import org.apache.doris.nereids.trees.expressions.functions.scalar.HllEmpty;
 import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
 import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
 import org.apache.doris.nereids.types.DataType;
@@ -89,4 +90,9 @@ public class HllUnion extends AggregateFunction
     public boolean canRollUp() {
         return true;
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        return new HllEmpty();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/HllUnionAgg.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/HllUnionAgg.java
index 15d02e73faf..b14b61b5be0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/HllUnionAgg.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/HllUnionAgg.java
@@ -22,6 +22,7 @@ import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.functions.AlwaysNotNullable;
 import 
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
 import org.apache.doris.nereids.trees.expressions.functions.Function;
+import org.apache.doris.nereids.trees.expressions.literal.BigIntLiteral;
 import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
 import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
 import org.apache.doris.nereids.types.BigIntType;
@@ -90,4 +91,9 @@ public class HllUnionAgg extends AggregateFunction
     public boolean canRollUp() {
         return false;
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        return new BigIntLiteral(0);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/IntersectCount.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/IntersectCount.java
index 17a74d3eac9..c013b2e8b4c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/IntersectCount.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/IntersectCount.java
@@ -22,6 +22,7 @@ import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.functions.AlwaysNotNullable;
 import 
org.apache.doris.nereids.trees.expressions.functions.BitmapIntersectFunction;
 import 
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
+import org.apache.doris.nereids.trees.expressions.literal.BigIntLiteral;
 import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
 import org.apache.doris.nereids.types.BigIntType;
 import org.apache.doris.nereids.types.BitmapType;
@@ -77,4 +78,9 @@ public class IntersectCount extends AggregateFunction
     public List<FunctionSignature> getSignatures() {
         return SIGNATURES;
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        return new BigIntLiteral(0);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/MapAgg.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/MapAgg.java
index 36cf5ef7edf..744d4a23a66 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/MapAgg.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/MapAgg.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.FunctionSignature;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.functions.AlwaysNotNullable;
 import 
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
+import org.apache.doris.nereids.trees.expressions.literal.MapLiteral;
 import org.apache.doris.nereids.trees.expressions.shape.BinaryExpression;
 import org.apache.doris.nereids.types.MapType;
 import org.apache.doris.nereids.types.coercion.AnyDataType;
@@ -29,6 +30,7 @@ import 
org.apache.doris.nereids.types.coercion.FollowToAnyDataType;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -69,4 +71,9 @@ public class MapAgg extends AggregateFunction
     public List<FunctionSignature> getSignatures() {
         return SIGNATURES;
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        return new MapLiteral(new ArrayList<>(), new ArrayList<>(), 
this.getDataType());
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/MultiDistinctCount.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/MultiDistinctCount.java
index 7287fc5c554..68d31e3e7bd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/MultiDistinctCount.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/MultiDistinctCount.java
@@ -23,6 +23,7 @@ import org.apache.doris.nereids.trees.expressions.Cast;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.functions.AlwaysNotNullable;
 import 
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
+import org.apache.doris.nereids.trees.expressions.literal.BigIntLiteral;
 import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
 import org.apache.doris.nereids.types.BigIntType;
 import org.apache.doris.nereids.types.coercion.AnyDataType;
@@ -87,4 +88,9 @@ public class MultiDistinctCount extends AggregateFunction
     public Expression withMustUseMultiDistinctAgg(boolean 
mustUseMultiDistinctAgg) {
         return new MultiDistinctCount(mustUseMultiDistinctAgg, false, 
children);
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        return new BigIntLiteral(0);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/MultiDistinctSum0.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/MultiDistinctSum0.java
index 628e18e4772..2b0eda06b42 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/MultiDistinctSum0.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/MultiDistinctSum0.java
@@ -23,12 +23,19 @@ import 
org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.functions.AlwaysNotNullable;
 import 
org.apache.doris.nereids.trees.expressions.functions.ComputePrecisionForSum;
 import 
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
+import org.apache.doris.nereids.trees.expressions.literal.BigIntLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.DecimalV3Literal;
+import org.apache.doris.nereids.trees.expressions.literal.DoubleLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.LargeIntLiteral;
 import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
 import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
 import org.apache.doris.nereids.types.DataType;
+import org.apache.doris.nereids.types.DecimalV3Type;
 
 import com.google.common.base.Preconditions;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.util.List;
 
 /** MultiDistinctSum0 */
@@ -89,4 +96,20 @@ public class MultiDistinctSum0 extends AggregateFunction 
implements UnaryExpress
     public Expression withMustUseMultiDistinctAgg(boolean 
mustUseMultiDistinctAgg) {
         return new MultiDistinctSum0(mustUseMultiDistinctAgg, false, 
children.get(0));
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        DataType dataType = getDataType();
+        if (dataType.isBigIntType()) {
+            return new BigIntLiteral(0);
+        } else if (dataType.isLargeIntType()) {
+            return new LargeIntLiteral(new BigInteger("0"));
+        } else if (dataType.isDecimalV3Type()) {
+            return new DecimalV3Literal((DecimalV3Type) dataType, new 
BigDecimal("0"));
+        } else if (dataType.isDoubleType()) {
+            return new DoubleLiteral(0);
+        } else {
+            return new DoubleLiteral(0);
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Ndv.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Ndv.java
index 25e5fb103da..ea90bc58791 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Ndv.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Ndv.java
@@ -24,6 +24,7 @@ import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.functions.AlwaysNotNullable;
 import 
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
 import org.apache.doris.nereids.trees.expressions.functions.Function;
+import org.apache.doris.nereids.trees.expressions.literal.BigIntLiteral;
 import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
 import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
 import org.apache.doris.nereids.types.BigIntType;
@@ -90,4 +91,9 @@ public class Ndv extends AggregateFunction
     public boolean canRollUp() {
         return false;
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        return new BigIntLiteral(0);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/PercentileArray.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/PercentileArray.java
index efc2ef0304f..b4d7467e4c0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/PercentileArray.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/PercentileArray.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.FunctionSignature;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.functions.AlwaysNotNullable;
 import 
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
+import org.apache.doris.nereids.trees.expressions.literal.ArrayLiteral;
 import org.apache.doris.nereids.trees.expressions.shape.BinaryExpression;
 import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
 import org.apache.doris.nereids.types.ArrayType;
@@ -34,6 +35,7 @@ import org.apache.doris.nereids.types.TinyIntType;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -86,4 +88,9 @@ public class PercentileArray extends AggregateFunction
     public List<FunctionSignature> getSignatures() {
         return SIGNATURES;
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        return new ArrayLiteral(new ArrayList<>(), this.getDataType());
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/QuantileUnion.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/QuantileUnion.java
index fba37528fd8..3d0729775a5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/QuantileUnion.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/QuantileUnion.java
@@ -22,6 +22,7 @@ import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.functions.AlwaysNotNullable;
 import 
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
+import 
org.apache.doris.nereids.trees.expressions.functions.scalar.QuantileStateEmpty;
 import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
 import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
 import org.apache.doris.nereids.types.DataType;
@@ -83,4 +84,9 @@ public class QuantileUnion extends AggregateFunction
     public List<FunctionSignature> getSignatures() {
         return SIGNATURES;
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        return new QuantileStateEmpty();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/SequenceCount.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/SequenceCount.java
index 5bbf0cf0b43..7af112ef8e8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/SequenceCount.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/SequenceCount.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.FunctionSignature;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.functions.AlwaysNotNullable;
 import 
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
+import org.apache.doris.nereids.trees.expressions.literal.BigIntLiteral;
 import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
 import org.apache.doris.nereids.types.BigIntType;
 import org.apache.doris.nereids.types.BooleanType;
@@ -84,4 +85,9 @@ public class SequenceCount extends AggregateFunction
     public List<FunctionSignature> getSignatures() {
         return SIGNATURES;
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        return new BigIntLiteral(0);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Sum0.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Sum0.java
index 1f63c53dabc..fd052a69c0e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Sum0.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/Sum0.java
@@ -25,6 +25,10 @@ import 
org.apache.doris.nereids.trees.expressions.functions.ComputePrecisionForS
 import 
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
 import org.apache.doris.nereids.trees.expressions.functions.Function;
 import 
org.apache.doris.nereids.trees.expressions.functions.window.SupportWindowAnalytic;
+import org.apache.doris.nereids.trees.expressions.literal.BigIntLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.DecimalV3Literal;
+import org.apache.doris.nereids.trees.expressions.literal.DoubleLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.LargeIntLiteral;
 import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
 import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
 import org.apache.doris.nereids.types.BigIntType;
@@ -41,6 +45,8 @@ import org.apache.doris.nereids.types.TinyIntType;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.util.List;
 
 /**
@@ -127,4 +133,20 @@ public class Sum0 extends AggregateFunction
     public boolean canRollUp() {
         return true;
     }
+
+    @Override
+    public Expression resultForEmptyInput() {
+        DataType dataType = getDataType();
+        if (dataType.isBigIntType()) {
+            return new BigIntLiteral(0);
+        } else if (dataType.isLargeIntType()) {
+            return new LargeIntLiteral(new BigInteger("0"));
+        } else if (dataType.isDecimalV3Type()) {
+            return new DecimalV3Literal((DecimalV3Type) dataType, new 
BigDecimal("0"));
+        } else if (dataType.isDoubleType()) {
+            return new DoubleLiteral(0);
+        } else {
+            return new DoubleLiteral(0);
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/AssertTrue.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/AssertTrue.java
index a4bffd9903a..b45b3426a25 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/AssertTrue.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/AssertTrue.java
@@ -22,6 +22,7 @@ import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.functions.AlwaysNotNullable;
 import 
org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
+import 
org.apache.doris.nereids.trees.expressions.functions.NoneMovableFunction;
 import org.apache.doris.nereids.trees.expressions.shape.BinaryExpression;
 import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
 import org.apache.doris.nereids.types.BooleanType;
@@ -36,7 +37,7 @@ import java.util.List;
  * ScalarFunction 'assert_true'.
  */
 public class AssertTrue extends ScalarFunction
-        implements BinaryExpression, ExplicitlyCastableSignature, 
AlwaysNotNullable {
+        implements BinaryExpression, ExplicitlyCastableSignature, 
AlwaysNotNullable, NoneMovableFunction {
 
     public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
             
FunctionSignature.ret(BooleanType.INSTANCE).args(BooleanType.INSTANCE, 
VarcharType.SYSTEM_DEFAULT));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java
index b4f4bd4d3bb..60af0d18666 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java
@@ -73,8 +73,8 @@ public interface Plan extends TreeNode<Plan> {
 
     default boolean containsSlots(ImmutableSet<Slot> slots) {
         return getExpressions().stream().anyMatch(
-                expression -> !Sets.intersection(slots, 
expression.getInputSlots()).isEmpty()
-                        || children().stream().anyMatch(plan -> 
plan.containsSlots(slots)));
+                expression -> !Sets.intersection(slots, 
expression.getInputSlots()).isEmpty())
+                        || children().stream().anyMatch(plan -> 
plan.containsSlots(slots));
     }
 
     default LogicalProperties computeLogicalProperties() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Project.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Project.java
index 73d4cb36448..a5d15f1d515 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Project.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Project.java
@@ -22,11 +22,13 @@ import org.apache.doris.nereids.trees.expressions.ExprId;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.NamedExpression;
 import org.apache.doris.nereids.trees.expressions.Slot;
+import 
org.apache.doris.nereids.trees.expressions.functions.NoneMovableFunction;
 import org.apache.doris.nereids.util.ExpressionUtils;
 import org.apache.doris.nereids.util.PlanUtils;
 
 import com.google.common.collect.ImmutableMap;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -61,7 +63,15 @@ public interface Project {
      * @return project list for merged project
      */
     default List<NamedExpression> mergeProjections(Project childProject) {
-        return PlanUtils.mergeProjections(childProject.getProjects(), 
getProjects());
+        List<NamedExpression> projects = new ArrayList<>();
+        projects.addAll(PlanUtils.mergeProjections(childProject.getProjects(), 
getProjects()));
+        for (NamedExpression expression : childProject.getProjects()) {
+            // keep NoneMovableFunction for later use
+            if (expression.containsType(NoneMovableFunction.class)) {
+                projects.add(expression);
+            }
+        }
+        return projects;
     }
 
     /**
@@ -97,4 +107,14 @@ public interface Project {
         }
         return true;
     }
+
+    /** containsNoneMovableFunction */
+    default boolean containsNoneMovableFunction() {
+        for (NamedExpression expression : getProjects()) {
+            if (expression.containsType(NoneMovableFunction.class)) {
+                return true;
+            }
+        }
+        return false;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalProject.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalProject.java
index 1484030e9c2..9174e4a40fb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalProject.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalProject.java
@@ -27,6 +27,7 @@ import org.apache.doris.nereids.trees.expressions.BoundStar;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.NamedExpression;
 import org.apache.doris.nereids.trees.expressions.Slot;
+import 
org.apache.doris.nereids.trees.expressions.functions.NoneMovableFunction;
 import org.apache.doris.nereids.trees.expressions.functions.scalar.Uuid;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
@@ -43,6 +44,7 @@ import com.google.common.collect.ImmutableList.Builder;
 import com.google.common.collect.ImmutableSet;
 import org.json.JSONObject;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -192,7 +194,15 @@ public class LogicalProject<CHILD_TYPE extends Plan> 
extends LogicalUnary<CHILD_
 
     @Override
     public Plan pruneOutputs(List<NamedExpression> prunedOutputs) {
-        return withProjects(prunedOutputs);
+        List<NamedExpression> allProjects = new ArrayList<>(prunedOutputs);
+        for (NamedExpression expression : projects) {
+            if (expression.containsType(NoneMovableFunction.class)) {
+                if (!prunedOutputs.contains(expression)) {
+                    allProjects.add(expression);
+                }
+            }
+        }
+        return withProjects(allProjects);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/expression/FoldConstantTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/expression/FoldConstantTest.java
index 4a5a5e9065c..bd26306c2a0 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/expression/FoldConstantTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/expression/FoldConstantTest.java
@@ -23,6 +23,7 @@ import org.apache.doris.nereids.analyzer.UnboundRelation;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.parser.NereidsParser;
 import org.apache.doris.nereids.rules.analysis.ExpressionAnalyzer;
+import org.apache.doris.nereids.rules.expression.rules.FoldConstantRule;
 import org.apache.doris.nereids.rules.expression.rules.FoldConstantRuleOnFE;
 import org.apache.doris.nereids.trees.expressions.Cast;
 import org.apache.doris.nereids.trees.expressions.Expression;
@@ -754,6 +755,21 @@ class FoldConstantTest extends ExpressionRewriteTestHelper 
{
         Assertions.assertTrue(e1.getDataType() instanceof VarcharType);
     }
 
+    @Test
+    void testFoldNvl() {
+        executor = new ExpressionRuleExecutor(ImmutableList.of(
+                ExpressionAnalyzer.FUNCTION_ANALYZER_RULE,
+                bottomUp(
+                        FoldConstantRule.INSTANCE
+                )
+        ));
+
+        assertRewriteExpression("nvl(NULL, 1)", "1");
+        assertRewriteExpression("nvl(NULL, NULL)", "NULL");
+        assertRewriteAfterTypeCoercion("nvl(IA, NULL)", "ifnull(IA, NULL)");
+        assertRewriteAfterTypeCoercion("nvl(IA, 1)", "ifnull(IA, 1)");
+    }
+
     private void assertRewriteExpression(String actualExpression, String 
expectedExpression) {
         ExpressionRewriteContext context = new ExpressionRewriteContext(
                 MemoTestUtils.createCascadesContext(new UnboundRelation(new 
RelationId(1), ImmutableList.of("test_table"))));
diff --git a/regression-test/data/nereids_hint_tpcds_p0/shape/query41.out 
b/regression-test/data/nereids_hint_tpcds_p0/shape/query41.out
index c27e19cc9f2..2dd4aadeae2 100644
--- a/regression-test/data/nereids_hint_tpcds_p0/shape/query41.out
+++ b/regression-test/data/nereids_hint_tpcds_p0/shape/query41.out
@@ -13,7 +13,7 @@ PhysicalResultSink
 --------------------filter((i1.i_manufact_id <= 744) and (i1.i_manufact_id >= 
704))
 ----------------------PhysicalOlapScan[item] apply RFs: RF0
 ------------------PhysicalProject
---------------------filter((item_cnt > 0))
+--------------------filter((ifnull(item_cnt, 0) > 0))
 ----------------------hashAgg[GLOBAL]
 ------------------------PhysicalDistribute[DistributionSpecHash]
 --------------------------hashAgg[LOCAL]
diff --git 
a/regression-test/data/nereids_p0/subquery/correlated_scalar_subquery.out 
b/regression-test/data/nereids_p0/subquery/correlated_scalar_subquery.out
new file mode 100644
index 00000000000..9414a5c9f61
--- /dev/null
+++ b/regression-test/data/nereids_p0/subquery/correlated_scalar_subquery.out
@@ -0,0 +1,108 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_where1 --
+1
+1
+
+-- !select_where2 --
+1
+1
+
+-- !select_where3 --
+
+-- !select_where4 --
+1
+1
+2
+2
+3
+
+-- !select_where5 --
+
+-- !select_where6 --
+2
+2
+
+-- !select_where7 --
+\N
+\N
+2
+2
+3
+3
+20
+22
+24
+
+-- !select_where8 --
+\N
+\N
+1
+1
+2
+2
+3
+3
+20
+22
+24
+
+-- !select_where9 --
+\N
+\N
+1
+1
+2
+2
+3
+3
+20
+22
+24
+
+-- !select_where10 --
+\N
+\N
+1
+1
+2
+2
+3
+3
+20
+22
+24
+
+-- !select_where11 --
+
+-- !select_project1 --
+\N     \N
+1      \N
+2      \N
+3      6
+20     \N
+22     \N
+24     \N
+
+-- !select_project2 --
+\N     \N
+1      \N
+2      \N
+3      6
+20     \N
+22     \N
+24     \N
+
+-- !select_join1 --
+3      4
+
+-- !select_join2 --
+3      4
+
+-- !select_having1 --
+1
+1
+
+-- !select_having2 --
+1
+1
+
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf1000_p0/shape/query41.out 
b/regression-test/data/nereids_tpcds_shape_sf1000_p0/shape/query41.out
index c27e19cc9f2..2dd4aadeae2 100644
--- a/regression-test/data/nereids_tpcds_shape_sf1000_p0/shape/query41.out
+++ b/regression-test/data/nereids_tpcds_shape_sf1000_p0/shape/query41.out
@@ -13,7 +13,7 @@ PhysicalResultSink
 --------------------filter((i1.i_manufact_id <= 744) and (i1.i_manufact_id >= 
704))
 ----------------------PhysicalOlapScan[item] apply RFs: RF0
 ------------------PhysicalProject
---------------------filter((item_cnt > 0))
+--------------------filter((ifnull(item_cnt, 0) > 0))
 ----------------------hashAgg[GLOBAL]
 ------------------------PhysicalDistribute[DistributionSpecHash]
 --------------------------hashAgg[LOCAL]
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query41.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query41.out
index 34081b60b90..d20341c931a 100644
--- 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query41.out
+++ 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query41.out
@@ -13,7 +13,7 @@ PhysicalResultSink
 --------------------filter((i1.i_manufact_id <= 788) and (i1.i_manufact_id >= 
748))
 ----------------------PhysicalOlapScan[item] apply RFs: RF0
 ------------------PhysicalProject
---------------------filter((item_cnt > 0))
+--------------------filter((ifnull(item_cnt, 0) > 0))
 ----------------------hashAgg[GLOBAL]
 ------------------------PhysicalDistribute[DistributionSpecHash]
 --------------------------hashAgg[LOCAL]
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query41.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query41.out
index 34081b60b90..d20341c931a 100644
--- 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query41.out
+++ 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query41.out
@@ -13,7 +13,7 @@ PhysicalResultSink
 --------------------filter((i1.i_manufact_id <= 788) and (i1.i_manufact_id >= 
748))
 ----------------------PhysicalOlapScan[item] apply RFs: RF0
 ------------------PhysicalProject
---------------------filter((item_cnt > 0))
+--------------------filter((ifnull(item_cnt, 0) > 0))
 ----------------------hashAgg[GLOBAL]
 ------------------------PhysicalDistribute[DistributionSpecHash]
 --------------------------hashAgg[LOCAL]
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query41.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query41.out
index 34081b60b90..d20341c931a 100644
--- a/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query41.out
+++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query41.out
@@ -13,7 +13,7 @@ PhysicalResultSink
 --------------------filter((i1.i_manufact_id <= 788) and (i1.i_manufact_id >= 
748))
 ----------------------PhysicalOlapScan[item] apply RFs: RF0
 ------------------PhysicalProject
---------------------filter((item_cnt > 0))
+--------------------filter((ifnull(item_cnt, 0) > 0))
 ----------------------hashAgg[GLOBAL]
 ------------------------PhysicalDistribute[DistributionSpecHash]
 --------------------------hashAgg[LOCAL]
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query41.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query41.out
index 34081b60b90..d20341c931a 100644
--- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query41.out
+++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query41.out
@@ -13,7 +13,7 @@ PhysicalResultSink
 --------------------filter((i1.i_manufact_id <= 788) and (i1.i_manufact_id >= 
748))
 ----------------------PhysicalOlapScan[item] apply RFs: RF0
 ------------------PhysicalProject
---------------------filter((item_cnt > 0))
+--------------------filter((ifnull(item_cnt, 0) > 0))
 ----------------------hashAgg[GLOBAL]
 ------------------------PhysicalDistribute[DistributionSpecHash]
 --------------------------hashAgg[LOCAL]
diff --git 
a/regression-test/data/new_shapes_p0/tpcds_sf100/noStatsRfPrune/query41.out 
b/regression-test/data/new_shapes_p0/tpcds_sf100/noStatsRfPrune/query41.out
index 34081b60b90..d20341c931a 100644
--- a/regression-test/data/new_shapes_p0/tpcds_sf100/noStatsRfPrune/query41.out
+++ b/regression-test/data/new_shapes_p0/tpcds_sf100/noStatsRfPrune/query41.out
@@ -13,7 +13,7 @@ PhysicalResultSink
 --------------------filter((i1.i_manufact_id <= 788) and (i1.i_manufact_id >= 
748))
 ----------------------PhysicalOlapScan[item] apply RFs: RF0
 ------------------PhysicalProject
---------------------filter((item_cnt > 0))
+--------------------filter((ifnull(item_cnt, 0) > 0))
 ----------------------hashAgg[GLOBAL]
 ------------------------PhysicalDistribute[DistributionSpecHash]
 --------------------------hashAgg[LOCAL]
diff --git 
a/regression-test/data/new_shapes_p0/tpcds_sf100/no_stats_shape/query41.out 
b/regression-test/data/new_shapes_p0/tpcds_sf100/no_stats_shape/query41.out
index 34081b60b90..d20341c931a 100644
--- a/regression-test/data/new_shapes_p0/tpcds_sf100/no_stats_shape/query41.out
+++ b/regression-test/data/new_shapes_p0/tpcds_sf100/no_stats_shape/query41.out
@@ -13,7 +13,7 @@ PhysicalResultSink
 --------------------filter((i1.i_manufact_id <= 788) and (i1.i_manufact_id >= 
748))
 ----------------------PhysicalOlapScan[item] apply RFs: RF0
 ------------------PhysicalProject
---------------------filter((item_cnt > 0))
+--------------------filter((ifnull(item_cnt, 0) > 0))
 ----------------------hashAgg[GLOBAL]
 ------------------------PhysicalDistribute[DistributionSpecHash]
 --------------------------hashAgg[LOCAL]
diff --git 
a/regression-test/data/new_shapes_p0/tpcds_sf100/rf_prune/query41.out 
b/regression-test/data/new_shapes_p0/tpcds_sf100/rf_prune/query41.out
index 34081b60b90..d20341c931a 100644
--- a/regression-test/data/new_shapes_p0/tpcds_sf100/rf_prune/query41.out
+++ b/regression-test/data/new_shapes_p0/tpcds_sf100/rf_prune/query41.out
@@ -13,7 +13,7 @@ PhysicalResultSink
 --------------------filter((i1.i_manufact_id <= 788) and (i1.i_manufact_id >= 
748))
 ----------------------PhysicalOlapScan[item] apply RFs: RF0
 ------------------PhysicalProject
---------------------filter((item_cnt > 0))
+--------------------filter((ifnull(item_cnt, 0) > 0))
 ----------------------hashAgg[GLOBAL]
 ------------------------PhysicalDistribute[DistributionSpecHash]
 --------------------------hashAgg[LOCAL]
diff --git a/regression-test/data/new_shapes_p0/tpcds_sf100/shape/query41.out 
b/regression-test/data/new_shapes_p0/tpcds_sf100/shape/query41.out
index 34081b60b90..d20341c931a 100644
--- a/regression-test/data/new_shapes_p0/tpcds_sf100/shape/query41.out
+++ b/regression-test/data/new_shapes_p0/tpcds_sf100/shape/query41.out
@@ -13,7 +13,7 @@ PhysicalResultSink
 --------------------filter((i1.i_manufact_id <= 788) and (i1.i_manufact_id >= 
748))
 ----------------------PhysicalOlapScan[item] apply RFs: RF0
 ------------------PhysicalProject
---------------------filter((item_cnt > 0))
+--------------------filter((ifnull(item_cnt, 0) > 0))
 ----------------------hashAgg[GLOBAL]
 ------------------------PhysicalDistribute[DistributionSpecHash]
 --------------------------hashAgg[LOCAL]
diff --git a/regression-test/data/new_shapes_p0/tpcds_sf1000/shape/query41.out 
b/regression-test/data/new_shapes_p0/tpcds_sf1000/shape/query41.out
index c27e19cc9f2..2dd4aadeae2 100644
--- a/regression-test/data/new_shapes_p0/tpcds_sf1000/shape/query41.out
+++ b/regression-test/data/new_shapes_p0/tpcds_sf1000/shape/query41.out
@@ -13,7 +13,7 @@ PhysicalResultSink
 --------------------filter((i1.i_manufact_id <= 744) and (i1.i_manufact_id >= 
704))
 ----------------------PhysicalOlapScan[item] apply RFs: RF0
 ------------------PhysicalProject
---------------------filter((item_cnt > 0))
+--------------------filter((ifnull(item_cnt, 0) > 0))
 ----------------------hashAgg[GLOBAL]
 ------------------------PhysicalDistribute[DistributionSpecHash]
 --------------------------hashAgg[LOCAL]
diff --git 
a/regression-test/suites/nereids_p0/subquery/correlated_scalar_subquery.groovy 
b/regression-test/suites/nereids_p0/subquery/correlated_scalar_subquery.groovy
new file mode 100644
index 00000000000..80d9cdb4bb2
--- /dev/null
+++ 
b/regression-test/suites/nereids_p0/subquery/correlated_scalar_subquery.groovy
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("correlated_scalar_subquery") {
+    sql "SET enable_nereids_planner=true"
+    sql "SET enable_fallback_to_original_planner=false"
+    sql """
+        drop table if exists correlated_scalar_t1;
+    """
+    sql """
+        drop table if exists correlated_scalar_t2;
+    """
+
+    sql """
+        drop table if exists correlated_scalar_t3;
+    """
+
+    sql """
+        create table correlated_scalar_t1
+                (c1 bigint, c2 bigint)
+                ENGINE=OLAP
+        DUPLICATE KEY(c1, c2)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(c1) BUCKETS 1
+        PROPERTIES (
+        "replication_num" = "1"
+        );
+    """
+    sql """
+        create table correlated_scalar_t2
+                (c1 bigint, c2 bigint)
+                ENGINE=OLAP
+        DUPLICATE KEY(c1, c2)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(c1) BUCKETS 1
+        PROPERTIES (
+        "replication_num" = "1"
+        );
+    """
+
+    sql """
+        create table correlated_scalar_t3
+                (c1 bigint, c2 bigint)
+                ENGINE=OLAP
+        DUPLICATE KEY(c1, c2)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(c1) BUCKETS 1
+        PROPERTIES (
+        "replication_num" = "1"
+        );
+    """
+
+    sql """
+        insert into correlated_scalar_t1 values (1,null),(null,1),(1,2), 
(null,2),(1,3), (2,4), (2,5), (3,3), (3,4), (20,2), (22,3), (24,4),(null,null);
+    """
+    sql """
+        insert into correlated_scalar_t2 values (1,null),(null,1),(1,4), 
(1,2), (null,3), (2,4), (3,7), (3,9),(null,null),(5,1);
+    """
+    sql """
+        insert into correlated_scalar_t3 values (1,null),(null,1),(1,9), 
(1,8), (null,7), (2,6), (3,7), (3,9),(null,null),(5,1);
+    """
+
+    qt_select_where1 """select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select c1 from correlated_scalar_t2 where 
correlated_scalar_t1.c1 = correlated_scalar_t2.c1 and correlated_scalar_t2.c2 < 
4) order by c1;"""
+    qt_select_where2 """select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select any_value(c1) from correlated_scalar_t2 where 
correlated_scalar_t1.c1 = correlated_scalar_t2.c1 and correlated_scalar_t2.c2 < 
4) order by c1;"""
+    qt_select_where3 """select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select e1 from (select 1 k1) as t lateral view 
explode_numbers(5) tmp1 as e1 where correlated_scalar_t1.c1 = e1 and 
correlated_scalar_t1.c2 = e1 order by e1) order by c1;"""
+    qt_select_where4 """select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select col from (select c1 col from 
correlated_scalar_t2 group by c1 ) tt where correlated_scalar_t1.c1 = tt.col) 
order by c1;"""
+    qt_select_where5 """select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select col from (select max(c1) over() col from 
correlated_scalar_t2 ) tt where correlated_scalar_t1.c1 = tt.col) order by 
c1;"""
+    qt_select_where6 """select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select min(correlated_scalar_t2.c1)  from 
correlated_scalar_t2 join correlated_scalar_t3 on correlated_scalar_t2.c1 = 
correlated_scalar_t3.c2 where correlated_scalar_t2.c2 = 
correlated_scalar_t1.c1) order by c1;"""
+    qt_select_where7 """select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select x from (select count(c1)x from 
correlated_scalar_t2 where correlated_scalar_t1.c1 = correlated_scalar_t2.c1 
order by count(c1))tt) order by c1;"""
+    qt_select_where8 """select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select count(col) from (select c1 col from 
correlated_scalar_t2 group by c1 ) tt where correlated_scalar_t1.c1 = tt.col) 
order by c1;"""
+    qt_select_where9 """select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select count(col) from (select max(c1) over() col 
from correlated_scalar_t2) tt where correlated_scalar_t1.c1 = tt.col) order by 
c1;"""
+    qt_select_where10 """select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select count(correlated_scalar_t2.c1)  from 
correlated_scalar_t2 join correlated_scalar_t3 on correlated_scalar_t2.c1 = 
correlated_scalar_t3.c2 where correlated_scalar_t2.c2 = 
correlated_scalar_t1.c1) order by c1;"""
+    qt_select_where11 """select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select count(c1) from correlated_scalar_t2 having 
correlated_scalar_t1.c1 = count(c1)) order by c1;"""
+
+    qt_select_project1 """select c1, sum((select c1 from correlated_scalar_t2 
where correlated_scalar_t1.c1 = correlated_scalar_t2.c1 and 
correlated_scalar_t2.c2 > 7)) from correlated_scalar_t1 group by c1 order by 
c1;"""
+    qt_select_project2 """select c1, sum((select any_value(c1) from 
correlated_scalar_t2 where correlated_scalar_t1.c1 = correlated_scalar_t2.c1 
and correlated_scalar_t2.c2 > 7)) from correlated_scalar_t1 group by c1 order 
by c1;"""
+
+    qt_select_join1 """select correlated_scalar_t1.* from correlated_scalar_t1 
join correlated_scalar_t2 on correlated_scalar_t1.c1 = correlated_scalar_t2.c2 
and correlated_scalar_t1.c2 > (select c1 from correlated_scalar_t2 where 
correlated_scalar_t1.c1 = correlated_scalar_t2.c1 and correlated_scalar_t2.c2 > 
7);"""
+    qt_select_join2 """select correlated_scalar_t1.* from correlated_scalar_t1 
join correlated_scalar_t2 on correlated_scalar_t1.c1 = correlated_scalar_t2.c2 
and correlated_scalar_t1.c2 > (select any_value(c1) from correlated_scalar_t2 
where correlated_scalar_t1.c1 = correlated_scalar_t2.c1 and 
correlated_scalar_t2.c2 > 7);"""
+
+    qt_select_having1 """select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select correlated_scalar_t2.c1 from 
correlated_scalar_t2  where correlated_scalar_t2.c2 < 4 having 
correlated_scalar_t1.c1 = correlated_scalar_t2.c1);"""  
+    qt_select_having2 """select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select any_value(correlated_scalar_t2.c1) from 
correlated_scalar_t2  where correlated_scalar_t2.c2 < 4 having 
correlated_scalar_t1.c1 = any_value(correlated_scalar_t2.c1));"""
+
+    test {
+        sql """
+              select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select c1 from correlated_scalar_t2 where 
correlated_scalar_t1.c1 = correlated_scalar_t2.c1);
+        """
+        exception "correlate scalar subquery must return only 1 row"
+    }
+
+    test {
+        sql """
+              select c1, sum((select c1 from correlated_scalar_t2 where 
correlated_scalar_t1.c1 = correlated_scalar_t2.c1)) from correlated_scalar_t1 
group by c1 order by c1;
+        """
+        exception "correlate scalar subquery must return only 1 row"
+    }
+
+    test {
+        sql """
+              select correlated_scalar_t1.* from correlated_scalar_t1 join 
correlated_scalar_t2 on correlated_scalar_t1.c1 = correlated_scalar_t2.c2 and 
correlated_scalar_t1.c2 > (select c1 from correlated_scalar_t2 where 
correlated_scalar_t1.c1 = correlated_scalar_t2.c1);
+        """
+        exception "correlate scalar subquery must return only 1 row"
+    }
+
+    test {
+        sql """
+              select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select c1 from correlated_scalar_t2 where 
correlated_scalar_t1.c1 = correlated_scalar_t2.c1 limit 2);
+        """
+        exception "limit is not supported in correlated subquery"
+    }
+
+    test {
+        sql """
+              select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select e1 from (select k1 from (select 1 k1 ) as t 
where correlated_scalar_t1.c1 = k1 ) tt lateral view explode_numbers(5) tmp1 as 
e1 order by e1);
+        """
+        exception "access outer query's column before lateral view is not 
supported"
+    }
+
+    test {
+        sql """
+              select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select e1 from (select 1 k1) as t lateral view 
explode_numbers(5) tmp1 as e1 where correlated_scalar_t1.c1 = e1 having 
correlated_scalar_t1.c2 = e1 order by e1);
+        """
+        exception "access outer query's column in two places is not supported"
+    }
+
+    test {
+        sql """
+              select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select e1 from (select 1 k1) as t lateral view 
explode_numbers(5) tmp1 as e1 where correlated_scalar_t1.c1 = e1 or 
correlated_scalar_t1.c2 = e1 order by e1);
+        """
+        exception "Unsupported correlated subquery with correlated predicate"
+    }
+
+    test {
+        sql """
+              select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select correlated_scalar_t1.c1 from 
correlated_scalar_t2);
+        """
+        exception "access outer query's column in project is not supported"
+    }
+
+    test {
+        sql """
+              select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select max(c1) over() from correlated_scalar_t2 
where correlated_scalar_t1.c1 = correlated_scalar_t2.c1 order by c1);
+        """
+        exception "access outer query's column before window function is not 
supported"
+    }
+
+    test {
+        sql """
+              select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select max(correlated_scalar_t1.c1) over() from 
correlated_scalar_t2 order by c1);
+        """
+        exception "access outer query's column in project is not supported"
+    }
+
+    test {
+        sql """
+              select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select min(correlated_scalar_t2.c1)  from 
correlated_scalar_t2 join (select correlated_scalar_t3.c1 from 
correlated_scalar_t3 where correlated_scalar_t1.c1 = correlated_scalar_t3.c2 ) 
tt on correlated_scalar_t2.c2 > tt.c1);
+        """
+        exception "access outer query's column before join is not supported"
+    }
+
+    test {
+        sql """
+              select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select correlated_scalar_t2.c1 from 
correlated_scalar_t2 join correlated_scalar_t3 on correlated_scalar_t1.c1 = 
correlated_scalar_t3.c2 );
+        """
+        exception "access outer query's column in join is not supported"
+    }
+
+    test {
+        sql """
+              select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select c1 from correlated_scalar_t2 order by 
correlated_scalar_t1.c1);
+        """
+        exception "Unknown column"
+    }
+
+    test {
+        sql """
+              select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select c1 from (select c1 from correlated_scalar_t2 
order by correlated_scalar_t1.c1)tt );
+        """
+        exception "Unknown column"
+    }
+
+    test {
+        sql """
+              select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select count(c1) from correlated_scalar_t2 where 
correlated_scalar_t1.c1 = correlated_scalar_t2.c1 group by c2);
+        """
+        exception "access outer query's column before agg with group by is not 
supported"
+    }
+
+    test {
+        sql """
+              select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select count(c1) from correlated_scalar_t2 where 
correlated_scalar_t1.c1 = correlated_scalar_t2.c1 having count(c1) > 10);
+        """
+        exception "only project, sort and subquery alias node is allowed after 
agg node"
+    }
+
+    test {
+        sql """
+              select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select count(correlated_scalar_t1.c1) from 
correlated_scalar_t2);
+        """
+        exception "access outer query's column in aggregate is not supported"
+    }
+
+    test {
+        sql """
+              select c1 from correlated_scalar_t1 where 
correlated_scalar_t1.c2 > (select count(col) from (select max(c1) col from 
correlated_scalar_t2 where correlated_scalar_t1.c1 = c1) tt );
+        """
+        exception "access outer query's column before two agg nodes is not 
supported"
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to