This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 9db3502f80265753b38b4ecbb0ee57f7871b6466
Author: Mingming Ge <7mmi...@gmail.com>
AuthorDate: Mon Aug 7 19:02:02 2023 +0800

    KYLIN-5774 Optimize Calcite plan to convert spark logical plan
---
 pom.xml                                            |   4 +
 .../org/apache/kylin/common/KylinConfigBase.java   |   4 -
 .../src/main/resources/kylin-defaults0.properties  |   1 -
 .../kylin/newten/EnhancedAggPushDownTest.java      |   2 +-
 .../kylin/newten/SpecialColumnNamesTest.java       | 104 ++++++++++++++
 .../apache/kylin/query/engine/QueryExecTest.java   |  59 +-------
 .../_global/project/special_column_names.json      |  35 +++++
 .../c41390c5-b35d-4db3-b167-029874b85a2c.json      |  13 ++
 .../c41390c5-b35d-4db3-b167-029874b85a2c.json      |  63 ++++++++
 .../c41390c5-b35d-4db3-b167-029874b85a2c.json      | 158 +++++++++++++++++++++
 .../special_column_names/table/SSB.CUSTOMER.json   |  68 +++++++++
 .../table/SSB.P_LINEORDER.json                     | 118 +++++++++++++++
 .../kylin/query/runtime/CalciteToSparkPlaner.scala | 121 +++++-----------
 .../kylin/query/runtime/SparderRexVisitor.scala    |  37 ++---
 .../apache/kylin/query/runtime/SparkEngine.java    |   8 +-
 .../kylin/query/runtime/plan/AggregatePlan.scala   |  87 +++++++-----
 .../kylin/query/runtime/plan/FilterPlan.scala      |  25 ++--
 .../apache/kylin/query/runtime/plan/JoinPlan.scala |  58 ++++----
 .../kylin/query/runtime/plan/LimitPlan.scala       |  25 ++--
 .../kylin/query/runtime/plan/MinusPlan.scala       |  14 +-
 .../kylin/query/runtime/plan/ProjectPlan.scala     |  19 +--
 .../kylin/query/runtime/plan/ResultPlan.scala      |   3 +-
 .../apache/kylin/query/runtime/plan/SortPlan.scala |  52 ++++---
 .../kylin/query/runtime/plan/TableScanPlan.scala   | 118 +++++++--------
 .../kylin/query/runtime/plan/UnionPlan.scala       |  30 ++--
 .../kylin/query/runtime/plan/ValuesPlan.scala      |  15 +-
 .../kylin/query/runtime/plan/WindowPlan.scala      |  29 ++--
 .../apache/kylin/query/util/RuntimeHelper.scala    |  31 ++--
 .../kylin/query/util/SparderDerivedUtil.scala      |  52 ++++---
 .../apache/spark/sql/KylinDataFrameManager.scala   |  19 ++-
 .../org/apache/spark/sql/SparkOperation.scala      |  64 ++++++---
 .../sql/execution/utils/SchemaProcessor.scala      |  36 ++---
 .../spark/sql/manager/SparderLookupManager.scala   |  69 ++++-----
 .../kylin/query/sql/KylinDataFrameManagerTest.java |  32 +++--
 .../query/runtime/plan/SegmentEmptyTest.scala      |  38 ++---
 .../org/apache/spark/sql/SparkInternalAgent.scala  |   0
 .../sql/datasource/storage/StorageStore.scala      |  73 +++++-----
 .../scala/org/apache/kylin/common/JobSupport.scala |   7 +-
 38 files changed, 1104 insertions(+), 587 deletions(-)

diff --git a/pom.xml b/pom.xml
index 71763ea249..dac240b005 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1509,6 +1509,10 @@
                         <groupId>org.codehaus.jackson</groupId>
                         <artifactId>jackson-mapper-asl</artifactId>
                     </exclusion>
+                    <exclusion>
+                        <groupId>org.apache.calcite</groupId>
+                        <artifactId>calcite</artifactId>
+                    </exclusion>
                 </exclusions>
                 <scope>provided</scope>
             </dependency>
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 5bfe8c262a..c1dd5379e3 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1906,10 +1906,6 @@ public abstract class KylinConfigBase implements 
Serializable {
         return 
Boolean.parseBoolean(this.getOptional("kylin.query.schema-cache-enabled", 
FALSE));
     }
 
-    public boolean isDataFrameCacheEnabled() {
-        return 
Boolean.parseBoolean(this.getOptional("kylin.query.dataframe-cache-enabled", 
TRUE));
-    }
-
     public boolean enableReplaceDynamicParams() {
         return 
Boolean.parseBoolean(this.getOptional("kylin.query.replace-dynamic-params-enabled",
 FALSE));
     }
diff --git a/src/core-common/src/main/resources/kylin-defaults0.properties 
b/src/core-common/src/main/resources/kylin-defaults0.properties
index b10ef59365..56cd346578 100644
--- a/src/core-common/src/main/resources/kylin-defaults0.properties
+++ b/src/core-common/src/main/resources/kylin-defaults0.properties
@@ -232,7 +232,6 @@ kylin.query.join-match-optimization-enabled=false
 kylin.query.convert-count-distinct-expression-enabled=false
 kylin.query.memory-limit-during-collect-mb=5400
 kylin.query.cartesian-partition-num-threshold-factor=100
-kylin.query.dataframe-cache-enabled=true
 
 # spark context canary to monitor spark
 kylin.canary.sqlcontext-enabled=false
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/EnhancedAggPushDownTest.java
 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/EnhancedAggPushDownTest.java
index a10ab79acb..b5f3646ebd 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/EnhancedAggPushDownTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/EnhancedAggPushDownTest.java
@@ -131,7 +131,7 @@ public class EnhancedAggPushDownTest extends 
NLocalWithSparkSessionTest {
             queryExec.executeQuery(sql);
         } catch (Exception e) {
             Assert.assertTrue(e instanceof SQLException);
-            
Assert.assertTrue(queryExec.getSparderQueryOptimizedExceptionMsg().contains("Path
 does not exist"));
+            
Assert.assertTrue(queryExec.getSparderQueryOptimizedExceptionMsg().contains("does
 not exist"));
         }
         return ContextUtil.listContexts();
     }
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/SpecialColumnNamesTest.java
 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/SpecialColumnNamesTest.java
new file mode 100644
index 0000000000..f78bc35983
--- /dev/null
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/SpecialColumnNamesTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.newten;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.hadoop.util.Shell;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
+import org.apache.kylin.query.engine.QueryExec;
+import org.apache.kylin.util.ExecAndComp;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparderEnv;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.internal.StaticSQLConf;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import lombok.val;
+
+public class SpecialColumnNamesTest extends NLocalWithSparkSessionTest {
+
+    @BeforeClass
+    public static void initSpark() {
+        if (Shell.MAC)
+            overwriteSystemPropBeforeClass("org.xerial.snappy.lib.name", 
"libsnappyjava.jnilib");//for snappy
+        if (ss != null && !ss.sparkContext().isStopped()) {
+            ss.stop();
+        }
+        sparkConf = new 
SparkConf().setAppName(UUID.randomUUID().toString()).setMaster("local[4]");
+        sparkConf.set("spark.serializer", 
"org.apache.spark.serializer.JavaSerializer");
+        sparkConf.set(StaticSQLConf.CATALOG_IMPLEMENTATION().key(), 
"in-memory");
+        sparkConf.set("spark.sql.shuffle.partitions", "1");
+        sparkConf.set("spark.memory.fraction", "0.1");
+        // opt memory
+        sparkConf.set("spark.shuffle.detectCorrupt", "false");
+        // For sinai_poc/query03, enable implicit cross join conversion
+        sparkConf.set("spark.sql.crossJoin.enabled", "true");
+        sparkConf.set("spark.sql.adaptive.enabled", "false");
+        sparkConf.set("spark.sql.autoBroadcastJoinThreshold", "1");
+        ss = SparkSession.builder().config(sparkConf).getOrCreate();
+        SparderEnv.setSparkSession(ss);
+    }
+
+    @Before
+    public void setup() throws IOException {
+        overwriteSystemProp("kylin.job.scheduler.poll-interval-second", "1");
+        
this.createTestMetadata("src/test/resources/ut_meta/special_column_names");
+        NDefaultScheduler scheduler = 
NDefaultScheduler.getInstance(getProject());
+        scheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv()));
+        if (!scheduler.hasStarted()) {
+            throw new RuntimeException("scheduler has not been started");
+        }
+    }
+
+    @After
+    public void after() throws Exception {
+        NDefaultScheduler.destroyInstance();
+        cleanupTestMetadata();
+    }
+
+    public String getProject() {
+        return "special_column_names";
+    }
+
+    @Test
+    public void testSpecialColumnNames() throws Exception {
+        fullBuild("c41390c5-b35d-4db3-b167-029874b85a2c");
+
+        populateSSWithCSVData(getTestConfig(), getProject(), ss);
+        List<Pair<String, String>> query = new ArrayList<>();
+        String sql = "select count(1), \"//LO_LINENUMBER\",  \"LO_CU//STKEY\" 
from SSB.P_LINEORDER group by \"//LO_LINENUMBER\", \"LO_CU//STKEY\"";
+        query.add(Pair.newPair("special_column_names", sql));
+        ExecAndComp.execAndCompare(query, getProject(), 
ExecAndComp.CompareLevel.SAME, "default");
+
+        QueryExec queryExec = new QueryExec(getProject(), 
KylinConfig.getInstanceFromEnv());
+        val resultSet = queryExec.executeQuery(sql);
+        Assert.assertEquals(140, resultSet.getRows().size());
+    }
+}
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/query/engine/QueryExecTest.java 
b/src/kylin-it/src/test/java/org/apache/kylin/query/engine/QueryExecTest.java
index a13cd7fb6a..ba34dc387c 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/query/engine/QueryExecTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/query/engine/QueryExecTest.java
@@ -20,16 +20,10 @@ package org.apache.kylin.query.engine;
 
 import java.sql.SQLException;
 
-import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.common.util.Unsafe;
-import org.apache.kylin.query.engine.meta.SimpleDataContext;
-import org.apache.kylin.query.relnode.KapRel;
-import org.apache.kylin.query.runtime.CalciteToSparkPlaner;
-import org.apache.kylin.query.util.QueryContextCutter;
 import org.apache.kylin.query.util.QueryHelper;
 import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
@@ -40,10 +34,8 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.springframework.security.util.FieldUtils;
 
 import lombok.val;
-import scala.Function0;
 
 public class QueryExecTest extends NLocalFileMetadataTestCase {
 
@@ -70,7 +62,8 @@ public class QueryExecTest extends NLocalFileMetadataTestCase 
{
     }
 
     /**
-     *  <p>See also {@link 
org.apache.kylin.query.engine.QueryExecTest#testSumCaseWhenHasNull()}
+     * <p>See also {@link 
org.apache.kylin.query.engine.QueryExecTest#testSumCaseWhenHasNull()}
+     *
      * @throws SQLException
      */
     @Test
@@ -90,6 +83,7 @@ public class QueryExecTest extends NLocalFileMetadataTestCase 
{
      * <code>case COUNT(null) when 0 then null else SUM0(null) end</code>, 
which is incompatible with model section
      *
      * <p>See also {@link 
org.apache.kylin.query.engine.QueryExecTest#testWorkWithoutKapAggregateReduceFunctionsRule()}
+     *
      * @throws SqlParseException
      */
     @Test
@@ -140,51 +134,4 @@ public class QueryExecTest extends 
NLocalFileMetadataTestCase {
             }
         }
     }
-
-    @Test
-    public void testSparkPlanWithoutCache() throws IllegalAccessException, 
SqlParseException {
-        overwriteSystemProp("kylin.query.dataframe-cache-enabled", "false");
-        String sql = "select count(*) from TEST_KYLIN_FACT group by seller_id";
-        QueryExec qe = new QueryExec(project, 
KylinConfig.getInstanceFromEnv());
-        RelNode root = qe.parseAndOptimize(sql);
-        SimpleDataContext dataContext = (SimpleDataContext) 
FieldUtils.getFieldValue(qe, "dataContext");
-        QueryContextCutter.selectRealization(root, 
BackdoorToggles.getIsQueryFromAutoModeling());
-        CalciteToSparkPlaner calciteToSparkPlaner = new 
CalciteToSparkPlaner(dataContext) {
-            @Override
-            public Dataset<Row> actionWithCache(KapRel rel, 
Function0<Dataset<Row>> body) {
-                throw new RuntimeException();
-            }
-        };
-        try {
-            calciteToSparkPlaner.go(root.getInput(0));
-        } finally {
-            calciteToSparkPlaner.cleanCache();
-        }
-    }
-
-    @Test
-    public void testSparkPlanWithCache() throws IllegalAccessException, 
SqlParseException {
-        overwriteSystemProp("kylin.query.dataframe-cache-enabled", "true");
-        String sql = "select count(*) from TEST_KYLIN_FACT group by seller_id";
-        QueryExec qe = new QueryExec(project, 
KylinConfig.getInstanceFromEnv());
-        RelNode root = qe.parseAndOptimize(sql);
-        SimpleDataContext dataContext = (SimpleDataContext) 
FieldUtils.getFieldValue(qe, "dataContext");
-        QueryContextCutter.selectRealization(root, 
BackdoorToggles.getIsQueryFromAutoModeling());
-        CalciteToSparkPlaner calciteToSparkPlaner = new 
CalciteToSparkPlaner(dataContext) {
-            @Override
-            public Dataset<Row> actionWithCache(KapRel rel, 
Function0<Dataset<Row>> body) {
-                throw new IllegalStateException();
-            }
-        };
-        Exception expectException = null;
-        try {
-            calciteToSparkPlaner.go(root.getInput(0));
-        } catch (Exception e) {
-            expectException = e;
-        } finally {
-            calciteToSparkPlaner.cleanCache();
-            Assert.assertTrue(expectException instanceof 
IllegalStateException);
-        }
-    }
-
 }
diff --git 
a/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/_global/project/special_column_names.json
 
b/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/_global/project/special_column_names.json
new file mode 100644
index 0000000000..c7500a3bc8
--- /dev/null
+++ 
b/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/_global/project/special_column_names.json
@@ -0,0 +1,35 @@
+{
+  "uuid" : "24854c55-8298-623e-e64a-42ae6faf1f1a",
+  "last_modified" : 1677855913210,
+  "create_time" : 1677855913185,
+  "version" : "4.0.0.0",
+  "name" : "special_column_names",
+  "owner" : "ADMIN",
+  "status" : "ENABLED",
+  "create_time_utc" : 1677855913185,
+  "default_database" : "DEFAULT",
+  "description" : "",
+  "principal" : null,
+  "keytab" : null,
+  "maintain_model_type" : "MANUAL_MAINTAIN",
+  "override_kylin_properties" : {
+    "kylin.metadata.semi-automatic-mode" : "false",
+    "kylin.query.metadata.expose-computed-column" : "true",
+    "kylin.source.default" : "9"
+  },
+  "segment_config" : {
+    "auto_merge_enabled" : false,
+    "auto_merge_time_ranges" : [ "WEEK", "MONTH", "QUARTER", "YEAR" ],
+    "volatile_range" : {
+      "volatile_range_number" : 0,
+      "volatile_range_enabled" : false,
+      "volatile_range_type" : "DAY"
+    },
+    "retention_range" : {
+      "retention_range_number" : 1,
+      "retention_range_enabled" : false,
+      "retention_range_type" : "MONTH"
+    },
+    "create_empty_segment_enabled" : false
+  }
+}
\ No newline at end of file
diff --git 
a/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/dataflow/c41390c5-b35d-4db3-b167-029874b85a2c.json
 
b/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/dataflow/c41390c5-b35d-4db3-b167-029874b85a2c.json
new file mode 100644
index 0000000000..c2e8cc33ed
--- /dev/null
+++ 
b/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/dataflow/c41390c5-b35d-4db3-b167-029874b85a2c.json
@@ -0,0 +1,13 @@
+{
+  "uuid" : "c41390c5-b35d-4db3-b167-029874b85a2c",
+  "last_modified" : 0,
+  "create_time" : 1677856069390,
+  "version" : "4.0.0.0",
+  "status" : "OFFLINE",
+  "last_status" : null,
+  "cost" : 50,
+  "query_hit_count" : 0,
+  "last_query_time" : 0,
+  "layout_query_hit_count" : { },
+  "segments" : [ ]
+}
\ No newline at end of file
diff --git 
a/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/index_plan/c41390c5-b35d-4db3-b167-029874b85a2c.json
 
b/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/index_plan/c41390c5-b35d-4db3-b167-029874b85a2c.json
new file mode 100644
index 0000000000..354bdadf2e
--- /dev/null
+++ 
b/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/index_plan/c41390c5-b35d-4db3-b167-029874b85a2c.json
@@ -0,0 +1,63 @@
+{
+  "uuid" : "c41390c5-b35d-4db3-b167-029874b85a2c",
+  "last_modified" : 1677856069297,
+  "create_time" : 1677856069297,
+  "version" : "4.0.0.0",
+  "description" : null,
+  "rule_based_index" : null,
+  "indexes" : [ {
+    "id" : 0,
+    "dimensions" : [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 
17 ],
+    "measures" : [ 100000 ],
+    "layouts" : [ {
+      "id" : 1,
+      "name" : null,
+      "owner" : null,
+      "col_order" : [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 
16, 17, 100000 ],
+      "shard_by_columns" : [ ],
+      "partition_by_columns" : [ ],
+      "sort_by_columns" : [ ],
+      "storage_type" : 20,
+      "update_time" : 1677856069309,
+      "manual" : false,
+      "auto" : false,
+      "base" : true,
+      "draft_version" : null,
+      "index_range" : null
+    } ],
+    "next_layout_offset" : 2
+  }, {
+    "id" : 20000000000,
+    "dimensions" : [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 
17 ],
+    "measures" : [ ],
+    "layouts" : [ {
+      "id" : 20000000001,
+      "name" : null,
+      "owner" : null,
+      "col_order" : [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 
16, 17 ],
+      "shard_by_columns" : [ ],
+      "partition_by_columns" : [ ],
+      "sort_by_columns" : [ ],
+      "storage_type" : 20,
+      "update_time" : 1677856069310,
+      "manual" : false,
+      "auto" : false,
+      "base" : true,
+      "draft_version" : null,
+      "index_range" : null
+    } ],
+    "next_layout_offset" : 2
+  } ],
+  "override_properties" : { },
+  "to_be_deleted_indexes" : [ ],
+  "auto_merge_time_ranges" : null,
+  "retention_range" : 0,
+  "engine_type" : 80,
+  "next_aggregation_index_id" : 10000,
+  "next_table_index_id" : 20000010000,
+  "agg_shard_by_columns" : [ ],
+  "extend_partition_columns" : [ ],
+  "layout_bucket_num" : { },
+  "approved_additional_recs" : 0,
+  "approved_removal_recs" : 0
+}
\ No newline at end of file
diff --git 
a/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/model_desc/c41390c5-b35d-4db3-b167-029874b85a2c.json
 
b/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/model_desc/c41390c5-b35d-4db3-b167-029874b85a2c.json
new file mode 100644
index 0000000000..13c33c24e2
--- /dev/null
+++ 
b/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/model_desc/c41390c5-b35d-4db3-b167-029874b85a2c.json
@@ -0,0 +1,158 @@
+{
+  "uuid" : "c41390c5-b35d-4db3-b167-029874b85a2c",
+  "last_modified" : 1677856069295,
+  "create_time" : 1677856068306,
+  "version" : "4.0.0.0",
+  "alias" : "p_lineorder",
+  "owner" : "ADMIN",
+  "config_last_modifier" : null,
+  "config_last_modified" : 0,
+  "description" : "",
+  "fact_table" : "SSB.P_LINEORDER",
+  "fact_table_alias" : null,
+  "management_type" : "MODEL_BASED",
+  "join_tables" : [ ],
+  "filter_condition" : "",
+  "partition_desc" : {
+    "partition_date_column" : "P_LINEORDER.LO_ORDERDATE",
+    "partition_date_start" : 0,
+    "partition_date_format" : "yyyy-MM-dd",
+    "partition_type" : "APPEND",
+    "partition_condition_builder" : 
"org.apache.kylin.metadata.model.PartitionDesc$DefaultPartitionConditionBuilder"
+  },
+  "capacity" : "MEDIUM",
+  "segment_config" : {
+    "auto_merge_enabled" : null,
+    "auto_merge_time_ranges" : null,
+    "volatile_range" : null,
+    "retention_range" : null,
+    "create_empty_segment_enabled" : false
+  },
+  "data_check_desc" : null,
+  "semantic_version" : 0,
+  "storage_type" : 0,
+  "model_type" : "BATCH",
+  "all_named_columns" : [ {
+    "id" : 0,
+    "name" : "LO_SHIPMODE",
+    "column" : "P_LINEORDER.LO_SHIPMODE",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 1,
+    "name" : "//LO_LINENUMBER",
+    "column" : "P_LINEORDER.//LO_LINENUMBER",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 2,
+    "name" : "LO_ORDTOTALPRICE",
+    "column" : "P_LINEORDER.LO_ORDTOTALPRICE",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 3,
+    "name" : "LO_SUPPLYCOST",
+    "column" : "P_LINEORDER.LO_SUPPLYCOST",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 4,
+    "name" : "LO_SUPPKEY",
+    "column" : "P_LINEORDER.LO_SUPPKEY",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 5,
+    "name" : "LO_QUANTITY",
+    "column" : "P_LINEORDER.LO_QUANTITY",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 6,
+    "name" : "LO_PARTKEY",
+    "column" : "P_LINEORDER.LO_PARTKEY",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 7,
+    "name" : "LO_ORDERKEY",
+    "column" : "P_LINEORDER.LO_ORDERKEY",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 8,
+    "name" : "LO_CU//STKEY",
+    "column" : "P_LINEORDER.LO_CU//STKEY",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 9,
+    "name" : "LO_SHIPPRIOTITY",
+    "column" : "P_LINEORDER.LO_SHIPPRIOTITY",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 10,
+    "name" : "LO_DISCOUNT",
+    "column" : "P_LINEORDER.LO_DISCOUNT",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 11,
+    "name" : "LO_ORDERPRIOTITY",
+    "column" : "P_LINEORDER.LO_ORDERPRIOTITY",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 12,
+    "name" : "LO_ORDERDATE",
+    "column" : "P_LINEORDER.LO_ORDERDATE",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 13,
+    "name" : "LO_REVENUE",
+    "column" : "P_LINEORDER.LO_REVENUE",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 14,
+    "name" : "V_REVENUE",
+    "column" : "P_LINEORDER.V_REVENUE",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 15,
+    "name" : "LO_COMMITDATE",
+    "column" : "P_LINEORDER.LO_COMMITDATE",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 16,
+    "name" : "LO_EXTENDEDPRICE",
+    "column" : "P_LINEORDER.LO_EXTENDEDPRICE",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 17,
+    "name" : "LO_TAX",
+    "column" : "P_LINEORDER.LO_TAX",
+    "status" : "DIMENSION"
+  } ],
+  "all_measures" : [ {
+    "name" : "COUNT_ALL",
+    "function" : {
+      "expression" : "COUNT",
+      "parameters" : [ {
+        "type" : "constant",
+        "value" : "1"
+      } ],
+      "returntype" : "bigint"
+    },
+    "column" : null,
+    "comment" : null,
+    "id" : 100000,
+    "type" : "NORMAL",
+    "internal_ids" : [ ]
+  } ],
+  "recommendations_count" : 0,
+  "computed_columns" : [ ],
+  "canvas" : {
+    "coordinate" : {
+      "P_LINEORDER" : {
+        "x" : 530.6666395399305,
+        "y" : 249.00000678168405,
+        "width" : 220.0,
+        "height" : 200.0
+      }
+    },
+    "zoom" : 9.0
+  },
+  "multi_partition_desc" : null,
+  "multi_partition_key_mapping" : null,
+  "fusion_id" : null
+}
\ No newline at end of file
diff --git 
a/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/table/SSB.CUSTOMER.json
 
b/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/table/SSB.CUSTOMER.json
new file mode 100644
index 0000000000..ba734604c1
--- /dev/null
+++ 
b/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/table/SSB.CUSTOMER.json
@@ -0,0 +1,68 @@
+{
+  "uuid" : "9481374f-63be-5f0b-5a6d-22c7a561b1d9",
+  "last_modified" : 0,
+  "create_time" : 1677856024986,
+  "version" : "4.0.0.0",
+  "name" : "CUSTOMER",
+  "columns" : [ {
+    "id" : "1",
+    "name" : "//C_CUSTKEY",
+    "datatype" : "integer",
+    "case_sensitive_name" : "c_custkey"
+  }, {
+    "id" : "2",
+    "name" : "C_N/@#AME",
+    "datatype" : "varchar(4096)",
+    "case_sensitive_name" : "c_name"
+  }, {
+    "id" : "3",
+    "name" : "/C_ADDR//ESS",
+    "datatype" : "varchar(4096)",
+    "case_sensitive_name" : "c_address"
+  }, {
+    "id" : "4",
+    "name" : "C_CITY",
+    "datatype" : "varchar(4096)",
+    "case_sensitive_name" : "c_city"
+  }, {
+    "id" : "5",
+    "name" : "C_NATION",
+    "datatype" : "varchar(4096)",
+    "case_sensitive_name" : "c_nation"
+  }, {
+    "id" : "6",
+    "name" : "C_REGION",
+    "datatype" : "varchar(4096)",
+    "case_sensitive_name" : "c_region"
+  }, {
+    "id" : "7",
+    "name" : "C_PHONE",
+    "datatype" : "varchar(4096)",
+    "case_sensitive_name" : "c_phone"
+  }, {
+    "id" : "8",
+    "name" : "C_MKTSEGMENT",
+    "datatype" : "varchar(4096)",
+    "case_sensitive_name" : "c_mktsegment"
+  } ],
+  "source_type" : 9,
+  "table_type" : "EXTERNAL",
+  "top" : false,
+  "increment_loading" : false,
+  "last_snapshot_path" : null,
+  "last_snapshot_size" : 0,
+  "snapshot_last_modified" : 0,
+  "query_hit_count" : 0,
+  "partition_column" : null,
+  "snapshot_partitions" : { },
+  "snapshot_partitions_info" : { },
+  "snapshot_total_rows" : 0,
+  "snapshot_partition_col" : null,
+  "selected_snapshot_partition_col" : null,
+  "temp_snapshot_path" : null,
+  "snapshot_has_broken" : false,
+  "database" : "SSB",
+  "transactional" : false,
+  "rangePartition" : false,
+  "partition_desc" : null
+}
\ No newline at end of file
diff --git 
a/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/table/SSB.P_LINEORDER.json
 
b/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/table/SSB.P_LINEORDER.json
new file mode 100644
index 0000000000..0ff6cd2b7d
--- /dev/null
+++ 
b/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/table/SSB.P_LINEORDER.json
@@ -0,0 +1,118 @@
+{
+  "uuid" : "f0dc3d45-0eb0-26c0-4681-b6c0d0c8e270",
+  "last_modified" : 0,
+  "create_time" : 1677856025283,
+  "version" : "4.0.0.0",
+  "name" : "P_LINEORDER",
+  "columns" : [ {
+    "id" : "1",
+    "name" : "LO_ORDERKEY",
+    "datatype" : "bigint",
+    "case_sensitive_name" : "lo_orderkey"
+  }, {
+    "id" : "2",
+    "name" : "//LO_LINENUMBER",
+    "datatype" : "bigint",
+    "case_sensitive_name" : "//lo_linenumber"
+  }, {
+    "id" : "3",
+    "name" : "LO_CU//STKEY",
+    "datatype" : "integer",
+    "case_sensitive_name" : "lo_cu//stkey"
+  }, {
+    "id" : "4",
+    "name" : "LO_PARTKEY",
+    "datatype" : "integer",
+    "case_sensitive_name" : "lo_partkey"
+  }, {
+    "id" : "5",
+    "name" : "LO_SUPPKEY",
+    "datatype" : "integer",
+    "case_sensitive_name" : "lo_suppkey"
+  }, {
+    "id" : "6",
+    "name" : "LO_ORDERDATE",
+    "datatype" : "date",
+    "case_sensitive_name" : "lo_orderdate"
+  }, {
+    "id" : "7",
+    "name" : "LO_ORDERPRIOTITY",
+    "datatype" : "varchar(4096)",
+    "case_sensitive_name" : "lo_orderpriotity"
+  }, {
+    "id" : "8",
+    "name" : "LO_SHIPPRIOTITY",
+    "datatype" : "integer",
+    "case_sensitive_name" : "lo_shippriotity"
+  }, {
+    "id" : "9",
+    "name" : "LO_QUANTITY",
+    "datatype" : "bigint",
+    "case_sensitive_name" : "lo_quantity"
+  }, {
+    "id" : "10",
+    "name" : "LO_EXTENDEDPRICE",
+    "datatype" : "bigint",
+    "case_sensitive_name" : "lo_extendedprice"
+  }, {
+    "id" : "11",
+    "name" : "LO_ORDTOTALPRICE",
+    "datatype" : "bigint",
+    "case_sensitive_name" : "lo_ordtotalprice"
+  }, {
+    "id" : "12",
+    "name" : "LO_DISCOUNT",
+    "datatype" : "bigint",
+    "case_sensitive_name" : "lo_discount"
+  }, {
+    "id" : "13",
+    "name" : "LO_REVENUE",
+    "datatype" : "bigint",
+    "case_sensitive_name" : "lo_revenue"
+  }, {
+    "id" : "14",
+    "name" : "LO_SUPPLYCOST",
+    "datatype" : "bigint",
+    "case_sensitive_name" : "lo_supplycost"
+  }, {
+    "id" : "15",
+    "name" : "LO_TAX",
+    "datatype" : "bigint",
+    "case_sensitive_name" : "lo_tax"
+  }, {
+    "id" : "16",
+    "name" : "LO_COMMITDATE",
+    "datatype" : "date",
+    "case_sensitive_name" : "lo_commitdate"
+  }, {
+    "id" : "17",
+    "name" : "LO_SHIPMODE",
+    "datatype" : "varchar(4096)",
+    "case_sensitive_name" : "lo_shipmode"
+  }, {
+    "id" : "18",
+    "name" : "V_REVENUE",
+    "datatype" : "bigint",
+    "case_sensitive_name" : "v_revenue"
+  } ],
+  "source_type" : 9,
+  "table_type" : "MANAGED_TABLE",
+  "top" : false,
+  "increment_loading" : false,
+  "last_snapshot_path" : null,
+  "last_snapshot_size" : 0,
+  "snapshot_last_modified" : 0,
+  "query_hit_count" : 0,
+  "partition_column" : null,
+  "snapshot_partitions" : { },
+  "snapshot_partitions_info" : { },
+  "snapshot_total_rows" : 0,
+  "snapshot_partition_col" : null,
+  "selected_snapshot_partition_col" : null,
+  "temp_snapshot_path" : null,
+  "snapshot_has_broken" : false,
+  "database" : "SSB",
+  "transactional" : false,
+  "rangePartition" : false,
+  "partition_desc" : null
+}
\ No newline at end of file
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/CalciteToSparkPlaner.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/CalciteToSparkPlaner.scala
index 76e9940270..13dbfcc987 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/CalciteToSparkPlaner.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/CalciteToSparkPlaner.scala
@@ -15,30 +15,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kylin.query.runtime
 
 import java.util
-import java.util.Collections
 
 import org.apache.calcite.DataContext
 import org.apache.calcite.rel.{RelNode, RelVisitor}
 import org.apache.kylin.common.KylinConfig
 import org.apache.kylin.engine.spark.utils.LogEx
-import org.apache.kylin.guava30.shaded.common.collect.Lists
-import org.apache.kylin.query.relnode.{KapAggregateRel, KapFilterRel, 
KapJoinRel, KapLimitRel, KapMinusRel, KapModelViewRel, KapNonEquiJoinRel, 
KapProjectRel, KapRel, KapSortRel, KapTableScan, KapUnionRel, KapValuesRel, 
KapWindowRel}
-import org.apache.kylin.query.runtime.plan.{AggregatePlan, FilterPlan, 
LimitPlan, ProjectPlan, SortPlan, TableScanPlan, ValuesPlan, WindowPlan}
-import org.apache.kylin.query.util.KapRelUtil
-import org.apache.spark.sql.DataFrame
-
-import scala.collection.JavaConverters._
-
+import org.apache.kylin.query.relnode._
+import org.apache.kylin.query.runtime.plan._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.{DataFrame, SparderEnv, SparkInternalAgent}
 
 class CalciteToSparkPlaner(dataContext: DataContext) extends RelVisitor with 
LogEx {
-  private val stack = new util.Stack[DataFrame]()
+  private val stack = new util.Stack[LogicalPlan]()
   private val setOpStack = new util.Stack[Int]()
   private var unionLayer = 0
-  private val dataframeCache = 
KylinConfig.getInstanceFromEnv.isDataFrameCacheEnabled
 
   // clear cache before any op
   cleanCache()
@@ -61,24 +54,28 @@ class CalciteToSparkPlaner(dataContext: DataContext) 
extends RelVisitor with Log
     stack.push(node match {
       case rel: KapTableScan => convertTableScan(rel)
       case rel: KapFilterRel =>
-        logTime("filter") { FilterPlan.filter(Lists.newArrayList(stack.pop()), 
rel, dataContext) }
+        logTime("filter") {
+          FilterPlan.filter(stack.pop(), rel, dataContext)
+        }
       case rel: KapProjectRel =>
         logTime("project") {
-          actionWith(rel) {
-            ProjectPlan.select(Lists.newArrayList(stack.pop()), rel, 
dataContext)
-          }
+          ProjectPlan.select(stack.pop(), rel, dataContext)
         }
       case rel: KapLimitRel =>
-        logTime("limit") { LimitPlan.limit(Lists.newArrayList(stack.pop()), 
rel, dataContext) }
+        logTime("limit") {
+          LimitPlan.limit(stack.pop(), rel, dataContext)
+        }
       case rel: KapSortRel =>
-        logTime("sort") { SortPlan.sort(Lists.newArrayList(stack.pop()), rel, 
dataContext) }
+        logTime("sort") {
+          SortPlan.sort(stack.pop(), rel, dataContext)
+        }
       case rel: KapWindowRel =>
-        logTime("window") { WindowPlan.window(Lists.newArrayList(stack.pop()), 
rel, dataContext) }
+        logTime("window") {
+          WindowPlan.window(stack.pop(), rel, dataContext)
+        }
       case rel: KapAggregateRel =>
         logTime("agg") {
-          actionWith(rel) {
-            AggregatePlan.agg(Lists.newArrayList(stack.pop()), rel)
-          }
+          AggregatePlan.agg(stack.pop(), rel)
         }
       case rel: KapJoinRel => convertJoinRel(rel)
       case rel: KapNonEquiJoinRel => convertNonEquiJoinRel(rel)
@@ -88,21 +85,29 @@ class CalciteToSparkPlaner(dataContext: DataContext) 
extends RelVisitor with Log
         if (KylinConfig.getInstanceFromEnv.isCollectUnionInOrder) {
           unionBlocks = unionBlocks.reverse
         }
-        logTime("union") { plan.UnionPlan.union(unionBlocks.asJava, rel, 
dataContext) }
+        logTime("union") {
+          plan.UnionPlan.union(unionBlocks, rel, dataContext)
+        }
       case rel: KapMinusRel =>
         val size = setOpStack.pop()
-        logTime("minus") { plan.MinusPlan.minus(Range(0, stack.size() - 
size).map(a => stack.pop()).reverse, rel, dataContext) }
+        logTime("minus") {
+          plan.MinusPlan.minus(Range(0, stack.size() - size).map(a => 
stack.pop()).reverse, rel, dataContext)
+        }
       case rel: KapValuesRel =>
-        logTime("values") { ValuesPlan.values(rel) }
+        logTime("values") {
+          ValuesPlan.values(rel)
+        }
       case rel: KapModelViewRel =>
-        logTime("modelview") { stack.pop() }
+        logTime("modelview") {
+          stack.pop()
+        }
     })
     if (node.isInstanceOf[KapUnionRel]) {
       unionLayer = unionLayer - 1
     }
   }
 
-  private def convertTableScan(rel: KapTableScan): DataFrame = {
+  private def convertTableScan(rel: KapTableScan): LogicalPlan = {
     rel.getContext.genExecFunc(rel, rel.getTableName) match {
       case "executeLookupTableQuery" =>
         logTime("createLookupTable") {
@@ -123,7 +128,7 @@ class CalciteToSparkPlaner(dataContext: DataContext) 
extends RelVisitor with Log
     }
   }
 
-  private def convertJoinRel(rel: KapJoinRel): DataFrame = {
+  private def convertJoinRel(rel: KapJoinRel): LogicalPlan = {
     if (!rel.isRuntimeJoin) {
       rel.getContext.genExecFunc(rel, "") match {
         case "executeMetadataQuery" =>
@@ -139,12 +144,12 @@ class CalciteToSparkPlaner(dataContext: DataContext) 
extends RelVisitor with Log
       val right = stack.pop()
       val left = stack.pop()
       logTime("join") {
-        plan.JoinPlan.join(Lists.newArrayList(left, right), rel)
+        plan.JoinPlan.join(Seq.apply(left, right), rel)
       }
     }
   }
 
-  private def convertNonEquiJoinRel(rel: KapNonEquiJoinRel): DataFrame = {
+  private def convertNonEquiJoinRel(rel: KapNonEquiJoinRel): LogicalPlan = {
     if (!rel.isRuntimeJoin) {
       logTime("join with table scan") {
         TableScanPlan.createOLAPTable(rel)
@@ -153,65 +158,17 @@ class CalciteToSparkPlaner(dataContext: DataContext) 
extends RelVisitor with Log
       val right = stack.pop()
       val left = stack.pop()
       logTime("non-equi join") {
-        plan.JoinPlan.nonEquiJoin(Lists.newArrayList(left, right), rel, 
dataContext)
-      }
-    }
-  }
-
-  def actionWith(rel: KapRel)(body: => DataFrame): DataFrame = {
-    if (!dataframeCache) {
-      body
-    } else {
-      actionWithCache(rel) {
-        body
-      }
-    }
-  }
-
-  protected def actionWithCache(rel: KapRel)(body: => DataFrame): DataFrame = {
-    var layoutId = 0L
-    var modelId = ""
-    var pruningSegmentHashCode = 0
-    if (rel.getDigest == null) {
-      body
-    } else {
-      try {
-        val storageContext = rel.getContext.storageContext
-        val layoutEntity = storageContext.getCandidate.getLayoutEntity
-        val prunedSegments = storageContext.getPrunedSegments
-        val tempTimeRange = new StringBuilder()
-        if (prunedSegments != null) {
-          prunedSegments.forEach(segment => {
-            tempTimeRange.append(segment.getName)
-          })
-        }
-        pruningSegmentHashCode = tempTimeRange.toString().hashCode
-        layoutId = layoutEntity.getId
-        modelId = layoutEntity.getModel.getId + "_" + pruningSegmentHashCode
-      } catch {
-        case e: Throwable => logWarning(s"Calculate layoutId modelId failed 
ex:${e.getMessage}")
-      }
-      rel.recomputeDigest()
-      val digestWithoutId = 
KapRelUtil.getDigestWithoutRelNodeId(rel.getDigest, layoutId, modelId)
-      if (unionLayer >= 1 && 
TableScanPlan.cacheDf.get.containsKey(digestWithoutId)) {
-        stack.pop()
-        logInfo("Happen Optimized from cache dataframe CacheKey:" + 
digestWithoutId)
-        TableScanPlan.cacheDf.get.get(digestWithoutId)
-      } else {
-        val df = body
-        if (unionLayer >= 1) {
-          TableScanPlan.cacheDf.get.put(digestWithoutId, df)
-        }
-        df
+        plan.JoinPlan.nonEquiJoin(Seq.apply(left, right), rel, dataContext)
       }
     }
   }
 
   def cleanCache(): Unit = {
-    TableScanPlan.cacheDf.get().clear()
+    TableScanPlan.cachePlan.get().clear()
   }
 
   def getResult(): DataFrame = {
-    stack.pop()
+    val logicalPlan = stack.pop()
+    SparkInternalAgent.getDataFrame(SparderEnv.getSparkSession, logicalPlan)
   }
 }
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/SparderRexVisitor.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/SparderRexVisitor.scala
index e2a37a0c7c..2adac31699 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/SparderRexVisitor.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/SparderRexVisitor.scala
@@ -22,37 +22,38 @@ package org.apache.kylin.query.runtime
 import java.math.BigDecimal
 import java.sql.Timestamp
 import java.time.ZoneId
+
 import org.apache.calcite.DataContext
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex._
 import org.apache.calcite.sql.SqlKind._
 import org.apache.calcite.sql.`type`.{BasicSqlType, IntervalSqlType, 
SqlTypeFamily, SqlTypeName}
-import org.apache.calcite.sql.fun.SqlDatetimeSubtractionOperator
-import org.apache.calcite.sql.fun.SqlDateTimeDivisionOperator
+import org.apache.calcite.sql.fun.{SqlDateTimeDivisionOperator, 
SqlDatetimeSubtractionOperator}
 import org.apache.kylin.common.util.DateFormat
 import org.apache.spark.sql.KapFunctions._
 import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util.DateTimeUtils._
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.{DataTypes, DateType, LongType, 
TimestampType}
-import org.apache.spark.sql.{Column, DataFrame}
 import org.apache.spark.sql.util.SparderTypeUtil
+import org.apache.spark.sql.{Column, DataFrame}
 import org.apache.spark.unsafe.types.UTF8String
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
- /**
-  * Convert RexNode to a nested Column
-  *
-  * @param inputFieldNames  fieldNames
-  * @param rowType     rowtyple
-  * @param dataContext context
-  */
-class SparderRexVisitor(val inputFieldNames: Array[String],
+/**
+ * Convert RexNode to a nested Column
+ *
+ * @param inputFieldNames fieldNames
+ * @param rowType         rowtyple
+ * @param dataContext     context
+ */
+class SparderRexVisitor(val inputFieldNames: Seq[String],
                         val rowType: RelDataType,
                         val dataContext: DataContext)
-    extends RexVisitorImpl[Any](true) {
+  extends RexVisitorImpl[Any](true) {
 
   def this(dfs: Array[DataFrame],
            rowType: RelDataType,
@@ -62,6 +63,10 @@ class SparderRexVisitor(val inputFieldNames: Array[String],
            rowType: RelDataType,
            dataContext: DataContext) = this(Array(df), rowType, dataContext)
 
+  def this(plan: LogicalPlan,
+           rowType: RelDataType,
+           dataContext: DataContext) = this(plan.output.map(c => c.name), 
rowType, dataContext)
+
   // scalastyle:off
   override def visitCall(call: RexCall): Any = {
 
@@ -297,11 +302,11 @@ class SparderRexVisitor(val inputFieldNames: 
Array[String],
     val v = convertFilterValueAfterAggr(literal)
     v match {
       case Some(toReturn) => toReturn
-      case None           => null
+      case None => null
     }
   }
 
-   case class MonthNum(num: Column)
+  case class MonthNum(num: Column)
 
   // as underlying schema types for cuboid table are all "string",
   // we rely spark to convert the cuboid col data from string to real type to 
finish comparing
@@ -313,12 +318,12 @@ class SparderRexVisitor(val inputFieldNames: 
Array[String],
     literal.getType match {
       case t: IntervalSqlType => {
         if (Seq("MONTH", "YEAR", "QUARTER").contains(
-              t.getIntervalQualifier.timeUnitRange.name)) {
+          t.getIntervalQualifier.timeUnitRange.name)) {
           return Some(
             
MonthNum(k_lit(literal.getValue.asInstanceOf[BigDecimal].intValue)))
         }
         if (literal.getType.getFamily
-              .asInstanceOf[SqlTypeFamily] == SqlTypeFamily.INTERVAL_DAY_TIME) 
{
+          .asInstanceOf[SqlTypeFamily] == SqlTypeFamily.INTERVAL_DAY_TIME) {
           return Some(
             SparderTypeUtil.toSparkTimestamp(
               new java.math.BigDecimal(literal.getValue.toString).longValue()))
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java
index 2d5d510894..b045f700f0 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java
@@ -26,6 +26,7 @@ import org.apache.kylin.common.KapConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.QueryTrace;
 import org.apache.kylin.common.exception.DryRunSucceedException;
+import org.apache.kylin.guava30.shaded.common.collect.ImmutableList;
 import org.apache.kylin.query.engine.exec.ExecuteResult;
 import org.apache.kylin.query.engine.exec.sparder.QueryEngine;
 import org.apache.kylin.query.mask.QueryResultMasks;
@@ -35,8 +36,6 @@ import org.apache.spark.sql.Row;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.kylin.guava30.shaded.common.collect.ImmutableList;
-
 public class SparkEngine implements QueryEngine {
     private static final Logger log = 
LoggerFactory.getLogger(SparkEngine.class);
 
@@ -50,10 +49,11 @@ public class SparkEngine implements QueryEngine {
         } finally {
             calciteToSparkPlaner.cleanCache();
         }
-        long takeTime = System.currentTimeMillis() - start;
+        Dataset<Row> df = calciteToSparkPlaner.getResult();
         QueryContext.current().record("to_spark_plan");
+        long takeTime = System.currentTimeMillis() - start;
         log.info("Plan take {} ms", takeTime);
-        return calciteToSparkPlaner.getResult();
+        return df;
     }
 
     @Override
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/AggregatePlan.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/AggregatePlan.scala
index a44b338cbb..de91678398 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/AggregatePlan.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/AggregatePlan.scala
@@ -17,6 +17,8 @@
  */
 package org.apache.kylin.query.runtime.plan
 
+import java.util.Locale
+
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.rex.RexLiteral
 import org.apache.calcite.sql.SqlKind
@@ -26,19 +28,18 @@ import org.apache.kylin.measure.percentile.PercentileCounter
 import org.apache.kylin.metadata.model.FunctionDesc
 import org.apache.kylin.query.relnode.{KapAggregateRel, KapProjectRel, 
KylinAggregateCall, OLAPAggregateRel}
 import org.apache.kylin.query.util.RuntimeHelper
-import org.apache.spark.sql.KapFunctions._
+import org.apache.spark.sql.KapFunctions.{k_lit, sum0}
 import org.apache.spark.sql._
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile
-import org.apache.spark.sql.catalyst.expressions.{CreateArray, In}
-import org.apache.spark.sql.catalyst.plans.logical.Project
+import org.apache.spark.sql.catalyst.expressions.{Attribute, CreateArray, In}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
 import org.apache.spark.sql.catalyst.util.ArrayData
 import org.apache.spark.sql.execution.utils.SchemaProcessor
 import org.apache.spark.sql.functions._
-import org.apache.spark.sql.types.{ArrayType, StructType}
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.udaf.SingleValueAgg
 import org.apache.spark.sql.util.SparderTypeUtil
 
-import java.util.Locale
 import scala.collection.JavaConverters._
 
 // scalastyle:off
@@ -47,12 +48,12 @@ object AggregatePlan extends LogEx {
     List("PERCENTILE", "PERCENTILE_APPROX", "INTERSECT_COUNT", 
"COUNT_DISTINCT", "BITMAP_UUID",
       FunctionDesc.FUNC_BITMAP_BUILD, FunctionDesc.FUNC_SUM_LC)
 
-  def agg(inputs: java.util.List[DataFrame],
-          rel: KapAggregateRel): DataFrame = logTime("aggregate", debug = 
true) {
+  def agg(plan: LogicalPlan,
+          rel: KapAggregateRel): LogicalPlan = logTime("aggregate", debug = 
true) {
+
+    val schemaNames = plan.output
 
-    var dataFrame = inputs.get(0)
-    val schemaNames = dataFrame.schema.fieldNames
-    val groupList = rel.getRewriteGroupKeys.asScala.map(groupId => 
col(schemaNames.apply(groupId))).toList
+    val groupList = rel.getRewriteGroupKeys.asScala.map(groupId => 
col(schemaNames.apply(groupId).name)).toList
 
     if (rel.getContext != null && rel.getContext.isExactlyAggregate && 
!rel.getContext.isNeedToManyDerived) {
       // exactly match, skip agg, direct project.
@@ -60,8 +61,8 @@ object AggregatePlan extends LogEx {
         case (call: KylinAggregateCall, index: Int) =>
           val funcName = OLAPAggregateRel.getAggrFuncName(call);
           val dataType = call.getFunc.getReturnDataType
-          val argNames = 
call.getArgList.asScala.map(dataFrame.schema.names.apply(_))
-          val columnName = argNames.map(col)
+          val argNames = call.getArgList.asScala.map(schemaNames.apply(_).name)
+          val columnName = argNames.map(name => col(name))
           val hash = System.identityHashCode(rel).toString
           funcName match {
             case FunctionDesc.FUNC_COUNT_DISTINCT =>
@@ -70,7 +71,7 @@ object AggregatePlan extends LogEx {
                 KapFunctions.approx_count_distinct_decode(columnName.head, 
dataType.getPrecision).alias(aggName)
               } else if (call.isBitmapCountDistinctFunc) {
                 if (rel.getContext.isExactlyFastBitmap) {
-                  col(schemaNames.apply(call.getArgList.get(0)))
+                  col(schemaNames.apply(call.getArgList.get(0)).name)
                 } else {
                   val aggName = 
SchemaProcessor.replaceToAggravateSchemaName(index, 
"PRECISE_COUNT_DISTINCT_DECODE", hash, argNames: _*)
                   
KapFunctions.precise_count_distinct_decode(columnName.head).alias(aggName)
@@ -90,32 +91,33 @@ object AggregatePlan extends LogEx {
               val sparkDataType = SparderTypeUtil.toSparkType(dataType)
               KapFunctions.k_sum_lc_decode(columnName.head, 
sparkDataType.json).alias(aggName)
             case _ =>
-              col(schemaNames.apply(call.getArgList.get(0)))
+              col(schemaNames.apply(call.getArgList.get(0)).name)
           }
         case (call: Any, _: Int) =>
-          col(schemaNames.apply(call.getArgList.get(0)))
+          col(schemaNames.apply(call.getArgList.get(0)).name)
       }.toList
 
       val prjList = groupList ++ aggCols
       logInfo(s"Query exactly match index, skip agg, project $prjList.")
-      dataFrame.select(prjList: _*)
+      SparkOperation.project(prjList, plan)
     } else {
-      dataFrame = genFiltersWhenIntersectCount(rel, dataFrame)
-      val aggList = buildAgg(dataFrame.schema, rel)
+      val intersectplan = genFiltersWhenIntersectCount(rel, plan)
+      val aggList = buildAgg(intersectplan.output, rel, plan)
       val groupSets = rel.getRewriteGroupSets.asScala
-        .map(groupSet => groupSet.asScala.map(groupId => 
col(schemaNames.apply(groupId))).toList).toList
-      SparkOperation.agg(AggArgc(dataFrame, groupList, aggList, groupSets, 
rel.isSimpleGroupType))
+        .map(groupSet => groupSet.asScala.map(groupId => 
col(schemaNames.apply(groupId).name)).toList).toList
+      SparkOperation.agg(AggArgc(intersectplan, groupList, aggList, groupSets, 
rel.isSimpleGroupType))
     }
   }
 
-  private def genFiltersWhenIntersectCount(rel: KapAggregateRel, dataFrame: 
DataFrame): DataFrame = {
+  private def genFiltersWhenIntersectCount(rel: KapAggregateRel, plan: 
LogicalPlan): LogicalPlan = {
     try {
+      val names = plan.output
+
       val intersects = 
rel.getRewriteAggCalls.asScala.filter(_.isInstanceOf[KylinAggregateCall])
         .filter(!_.asInstanceOf[KylinAggregateCall].getFunc.isCount)
         .map(_.asInstanceOf[KylinAggregateCall])
         .filter(call => !call.getFunc.isCount && 
OLAPAggregateRel.getAggrFuncName(call).equals(FunctionDesc.FUNC_INTERSECT_COUNT))
-      val names = dataFrame.schema.names
-      val children = dataFrame.queryExecution.logical
+      val children = plan
       if (intersects.nonEmpty && intersects.size == 
rel.getRewriteAggCalls.size() && children.isInstanceOf[Project]) {
         // only exists intersect count function in agg
         val list = children.asInstanceOf[Project].projectList
@@ -128,24 +130,28 @@ object AggregatePlan extends LogEx {
           val filters = intersects.map { call =>
             val filterColumnIndex = call.getArgList.get(1)
             val litIndex = call.getArgList.get(2)
-            new Column(In(col(names(filterColumnIndex)).expr, 
list.apply(litIndex).children.head.asInstanceOf[CreateArray].children))
+            new Column(In(col(names(filterColumnIndex).name).expr, 
list.apply(litIndex).children.head.asInstanceOf[CreateArray].children))
           }
           val column = filters.reduceLeft(_.or(_))
-          dataFrame.filter(column)
+
+          val filterPlan = Filter(column.expr, plan)
+          SparkOperation.project(plan.output.map(c => col(c.name)), filterPlan)
         } else {
-          dataFrame
+          plan
         }
       } else {
-        dataFrame
+        plan
       }
     } catch {
       case e: Throwable => logWarning("Error occurred when generate filters", 
e)
-        dataFrame
+        plan
     }
   }
 
-  def buildAgg(schema: StructType,
-               rel: KapAggregateRel): List[Column] = {
+  // S53 Fix https://olapio.atlassian.net/browse/KE-42473
+  def buildAgg(schema: Seq[Attribute],
+               rel: KapAggregateRel,
+               plan: LogicalPlan): List[Column] = {
     val hash = System.identityHashCode(rel).toString
 
     rel.getRewriteAggCalls.asScala.zipWithIndex.map {
@@ -155,8 +161,8 @@ object AggregatePlan extends LogEx {
         val isCount = call.getFunc.isCount
         val funcName =
           if (isCount) FunctionDesc.FUNC_COUNT else 
OLAPAggregateRel.getAggrFuncName(call)
-        val argNames = call.getArgList.asScala.map(schema.names.apply(_))
-        val columnName = argNames.map(col)
+        val argNames = call.getArgList.asScala.map(schema.apply(_).name)
+        val columnName = argNames.map(name => col(name))
         val registeredFuncName = 
RuntimeHelper.registerSingleByColName(funcName, dataType)
         val aggName = SchemaProcessor.replaceToAggravateSchemaName(index, 
funcName, hash, argNames: _*)
         if (funcName == FunctionDesc.FUNC_COUNT_DISTINCT) {
@@ -173,8 +179,9 @@ object AggregatePlan extends LogEx {
           KapFunctions.precise_bitmap_build(columnName.head).alias(aggName)
         } else if 
(funcName.equalsIgnoreCase(FunctionDesc.FUNC_INTERSECT_COUNT)) {
           require(columnName.size >= 3, s"Input columns size 
${columnName.size} don't greater than or equal to 3.")
+          val resolvedPlan = 
SparkInternalAgent.getDataFrame(SparderEnv.getSparkSession, plan)
           val columns = columnName.slice(0, 3).zipWithIndex.map {
-            case (column: Column, 2) => 
column.cast(ArrayType.apply(schema.fields.apply(call.getArgList.get(1)).dataType))
+            case (column: Column, 2) => 
column.cast(ArrayType.apply(resolvedPlan.schema.apply(call.getArgList.get(1)).dataType))
             case (column: Column, _) => column
           }
           val separator = 
s"\\${KylinConfig.getInstanceFromEnv.getIntersectFilterOrSeparator}"
@@ -198,9 +205,8 @@ object AggregatePlan extends LogEx {
         }
       case (call: Any, index: Int) =>
         val funcName = OLAPAggregateRel.getAggrFuncName(call)
-        val schemaNames = schema.names
-        val argNames = call.getArgList.asScala.map(id => schemaNames.apply(id))
-        val columnName = argNames.map(col)
+        val argNames = call.getArgList.asScala.map(id => schema.apply(id).name)
+        val columnName = argNames.map(name => col(name))
         val inputType = call.getType
         val aggName = SchemaProcessor.replaceToAggravateSchemaName(index,
           funcName,
@@ -211,7 +217,11 @@ object AggregatePlan extends LogEx {
             rel.getInput match {
               case projectRel: KapProjectRel =>
                 val percentageArg = 
projectRel.getChildExps.get(call.getArgList.get(1))
-                val accuracyArg = if (call.getArgList.size() < 3) { None } 
else { Some(projectRel.getChildExps.get(call.getArgList.get(2))) }
+                val accuracyArg = if (call.getArgList.size() < 3) {
+                  None
+                } else {
+                  Some(projectRel.getChildExps.get(call.getArgList.get(2)))
+                }
                 (percentageArg, accuracyArg) match {
                   case (percentageLitRex: RexLiteral, accuracyArgLitRex: 
Option[RexLiteral]) =>
                     if 
(KylinConfig.getInstanceFromEnv.getPercentileApproxAlgorithm.equalsIgnoreCase("t-digest"))
 {
@@ -260,7 +270,8 @@ object AggregatePlan extends LogEx {
             
KapFunctions.precise_bitmap_build_pushdown(columnName.head).alias(aggName)
           // Issue 4337: Supported select (select '2012-01-02') as data, xxx 
from table group by xxx
           case SqlKind.SINGLE_VALUE.sql =>
-            
SingleValueAgg(schema.head).apply(col(argNames.head)).alias(aggName)
+            val structField = StructField(schema.head.name, 
SparderTypeUtil.convertSqlTypeToSparkType(inputType), true, Metadata.empty)
+            
SingleValueAgg(structField).apply(col(argNames.head)).alias(aggName)
           case FunctionDesc.FUNC_GROUPING =>
             if (!rel.isSimpleGroupType) {
               grouping(argNames.head).alias(aggName)
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/FilterPlan.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/FilterPlan.scala
index 727e8bd2ab..494f390796 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/FilterPlan.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/FilterPlan.scala
@@ -17,22 +17,23 @@
  */
 package org.apache.kylin.query.runtime.plan
 
-import org.apache.kylin.engine.spark.utils.LogEx
 import org.apache.calcite.DataContext
+import org.apache.kylin.engine.spark.utils.LogEx
 import org.apache.kylin.query.relnode.KapFilterRel
 import org.apache.kylin.query.runtime.SparderRexVisitor
-import org.apache.spark.sql.{Column, DataFrame}
+import org.apache.spark.sql.Column
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
 
 object FilterPlan extends LogEx {
-  def filter(
-              inputs: java.util.List[DataFrame],
-              rel: KapFilterRel,
-              dataContext: DataContext): DataFrame = logTime("filter", debug = 
true) {
-      val df = inputs.get(0)
-      val visitor = new SparderRexVisitor(df,
-        rel.getInput.getRowType,
-        dataContext)
-      val filterColumn = rel.getCondition.accept(visitor).asInstanceOf[Column]
-      df.filter(filterColumn)
+  def filter(plan: LogicalPlan,
+            rel: KapFilterRel,
+            dataContext: DataContext): LogicalPlan = logTime("filter", debug = 
true) {
+
+    val visitor = new SparderRexVisitor(plan.output.map(_.name),
+      rel.getInput.getRowType,
+      dataContext)
+    val filterColumn = rel.getCondition.accept(visitor).asInstanceOf[Column]
+
+    Filter(filterColumn.expr, plan)
   }
 }
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/JoinPlan.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/JoinPlan.scala
index 0a67247f5d..a43ad2b954 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/JoinPlan.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/JoinPlan.scala
@@ -18,30 +18,32 @@
 package org.apache.kylin.query.runtime.plan
 
 import java.util
+
 import org.apache.calcite.DataContext
 import org.apache.calcite.rex.RexCall
+import org.apache.kylin.engine.spark.utils.LogEx
 import org.apache.kylin.query.relnode.{KapJoinRel, KapNonEquiJoinRel}
 import org.apache.kylin.query.runtime.SparderRexVisitor
 import org.apache.kylin.query.util.KapRelUtil
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.{Column, DataFrame}
+import org.apache.spark.sql.catalyst.plans.logical.{Join, JoinHint, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.{Cross, JoinType}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.{Column, SparkOperation}
 
 import scala.collection.JavaConverters._
 
-object JoinPlan {
-  def nonEquiJoin(inputs: java.util.List[DataFrame],
-           rel: KapNonEquiJoinRel, dataContext: DataContext): DataFrame = {
-    val lDataFrame = inputs.get(0)
-    val rDataFrame = inputs.get(1)
-    val lSchemaNames = lDataFrame.schema.fieldNames.map("l_" + _)
-    val rSchemaNames = rDataFrame.schema.fieldNames.map("r_" + _)
-    // val schema = statefulDF.indexSchema
-    val newLDataFrame = inputs.get(0).toDF(lSchemaNames: _*)
-    val newRDataFrame = inputs.get(1).toDF(rSchemaNames: _*)
+object JoinPlan extends LogEx {
+  def nonEquiJoin(plans: Seq[LogicalPlan],
+                  rel: KapNonEquiJoinRel, dataContext: DataContext): 
LogicalPlan = {
+    var lPlan = plans.apply(0)
+    var rPlan = plans.apply(1)
+
+    lPlan = SparkOperation.project(lPlan.output.map(c => 
col(c.name).alias("l_" + c.name)), lPlan)
+    rPlan = SparkOperation.project(rPlan.output.map(c => 
col(c.name).alias("r_" + c.name)), rPlan)
     // slice lSchemaNames with rel.getLeftInputSizeBeforeRewrite
     // to strip off the fields added during rewrite
     // as those field will disturb the original index based join condition
-    val visitor = new SparderRexVisitor(Array(lSchemaNames.slice(0, 
rel.getLeftInputSizeBeforeRewrite), rSchemaNames).flatten,
+    val visitor = new SparderRexVisitor(Seq(lPlan.output.map(c => 
c.name).slice(0, rel.getLeftInputSizeBeforeRewrite), rPlan.output.map(c => 
c.name)).flatten,
       null,
       dataContext)
     val pairs = new util.ArrayList[org.apache.kylin.common.util.Pair[Integer, 
Integer]]()
@@ -62,31 +64,31 @@ object JoinPlan {
         equalCond = 
equalCond.and(actRemaining.accept(visitor).asInstanceOf[Column])
       }
 
-      newLDataFrame.join(newRDataFrame, equalCond, rel.getJoinType.lowerName)
+      Join(lPlan, rPlan, joinType = JoinType(rel.getJoinType.lowerName), 
Some(equalCond.expr), JoinHint.NONE)
     } else {
       val conditionExprCol = 
rel.getCondition.accept(visitor).asInstanceOf[Column]
-      newLDataFrame.join(newRDataFrame, conditionExprCol, 
rel.getJoinType.lowerName)
+      Join(lPlan, rPlan, joinType = JoinType(rel.getJoinType.lowerName), 
Some(conditionExprCol.expr), JoinHint.NONE)
     }
   }
 
-  def join(inputs: java.util.List[DataFrame],
-           rel: KapJoinRel): DataFrame = {
+  // scalastyle:off
+  def join(plans: Seq[LogicalPlan],
+           rel: KapJoinRel): LogicalPlan = {
+
+    var lPlan = plans.apply(0)
+    var rPlan = plans.apply(1)
+
+    lPlan = SparkOperation.project(lPlan.output.map(c => 
col(c.name).alias("l_" + c.name)), lPlan)
+    rPlan = SparkOperation.project(rPlan.output.map(c => 
col(c.name).alias("r_" + c.name)), rPlan)
 
-    val lDataFrame = inputs.get(0)
-    val rDataFrame = inputs.get(1)
-    val lSchemaNames = lDataFrame.schema.fieldNames.map("l_" + _)
-    val rSchemaNames = rDataFrame.schema.fieldNames.map("r_" + _)
-    // val schema = statefulDF.indexSchema
-    val newLDataFrame = inputs.get(0).toDF(lSchemaNames: _*)
-    val newRDataFrame = inputs.get(1).toDF(rSchemaNames: _*)
     var joinCol: Column = null
 
     //  todo   utils
     rel.getLeftKeys.asScala
       .zip(rel.getRightKeys.asScala)
       .foreach(tuple => {
-        val col1 = col(lSchemaNames.apply(tuple._1))
-        val col2 = col(rSchemaNames.apply(tuple._2))
+        val col1 = col(lPlan.output.apply(tuple._1).name)
+        val col2 = col(rPlan.output.apply(tuple._2).name)
         val equalCond = makeEqualCond(col1, col2, rel.isJoinCondEqualNullSafe)
 
         if (joinCol == null) {
@@ -96,9 +98,9 @@ object JoinPlan {
         }
       })
     if (joinCol == null) {
-      newLDataFrame.crossJoin(newRDataFrame)
+      Join(lPlan, rPlan, joinType = Cross, None, JoinHint.NONE)
     } else {
-      newLDataFrame.join(newRDataFrame, joinCol, rel.getJoinType.lowerName)
+      Join(lPlan, rPlan, joinType = JoinType(rel.getJoinType.lowerName), 
Some(joinCol.expr), JoinHint.NONE)
     }
   }
 
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/LimitPlan.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/LimitPlan.scala
index 8669f4c8f6..385b4cec28 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/LimitPlan.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/LimitPlan.scala
@@ -20,27 +20,24 @@ package org.apache.kylin.query.runtime.plan
 import org.apache.calcite.DataContext
 import org.apache.kylin.query.relnode.KapLimitRel
 import org.apache.kylin.query.runtime.SparderRexVisitor
-import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.plans.logical.{Limit, LogicalPlan, Offset}
 
-// scalastyle:off
 object LimitPlan {
-  def limit(inputs: java.util.List[DataFrame],
+  def limit(plan: LogicalPlan,
             rel: KapLimitRel,
-            dataContext: DataContext): DataFrame = {
-    //    val schema = statefulDF.indexSchema
-    val visitor = new SparderRexVisitor(inputs.get(0),
+            dataContext: DataContext): LogicalPlan = {
+    val visitor = new SparderRexVisitor(plan,
       rel.getInput.getRowType,
       dataContext)
     val limit = BigDecimal(rel.localFetch.accept(visitor).toString).toInt
-    if (rel.localOffset == null) {
-      inputs
-        .get(0)
-        .limit(limit)
-    } else {
+
+    if (rel.localOffset != null) {
       val offset = BigDecimal(rel.localOffset.accept(visitor).toString).toInt
-      inputs
-        .get(0)
-        .offset(offset).limit(limit)
+      val offsetPlan = Offset(Literal(offset), plan)
+      Limit(Literal(limit), offsetPlan)
+    } else {
+      Limit(Literal(limit), plan)
     }
   }
 }
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/MinusPlan.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/MinusPlan.scala
index 265c59bea2..e873a03f79 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/MinusPlan.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/MinusPlan.scala
@@ -19,18 +19,16 @@ package org.apache.kylin.query.runtime.plan
 
 import org.apache.calcite.DataContext
 import org.apache.kylin.query.relnode.KapMinusRel
-import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.plans.logical.{Except, LogicalPlan}
 
 object MinusPlan {
-
-  def minus(
-             inputs: Seq[DataFrame],
-             rel: KapMinusRel,
-             dataContext: DataContext): DataFrame = {
+  def minus(plans: Seq[LogicalPlan],
+            rel: KapMinusRel,
+            dataContext: DataContext): LogicalPlan = {
     if (rel.all) {
-      inputs.reduce((set1, set2) => set1.exceptAll(set2))
+      plans.reduce((p1, p2) => Except(p1, p2, true))
     } else {
-      inputs.reduce((set1, set2) => set1.except(set2))
+      plans.reduce((p1, p2) => Except(p1, p2, false))
     }
   }
 }
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ProjectPlan.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ProjectPlan.scala
index 8f25c2312c..76c261a513 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ProjectPlan.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ProjectPlan.scala
@@ -17,26 +17,27 @@
  */
 package org.apache.kylin.query.runtime.plan
 
-import org.apache.kylin.engine.spark.utils.LogEx
 import org.apache.calcite.DataContext
 import org.apache.calcite.rex.RexInputRef
+import org.apache.kylin.engine.spark.utils.LogEx
 import org.apache.kylin.query.relnode.KapProjectRel
 import org.apache.kylin.query.runtime.SparderRexVisitor
-import org.apache.spark.sql.{Column, DataFrame}
-import org.apache.spark.sql.KapFunctions._
+import org.apache.spark.sql.KapFunctions.k_lit
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.{Column, SparkOperation}
 
 import scala.collection.JavaConverters._
 
-
 object ProjectPlan extends LogEx {
-  def select(inputs: java.util.List[DataFrame],
+
+  def select(plan: LogicalPlan,
              rel: KapProjectRel,
-             dataContext: DataContext): DataFrame = logTime("project", debug = 
true) {
-    val df = inputs.get(0)
+             dataContext: DataContext): LogicalPlan = {
     val duplicatedColumnsCount = collection.mutable.Map[Column, Int]()
+
     val selectedColumns = rel.rewriteProjects.asScala
       .map(rex => {
-        val visitor = new SparderRexVisitor(df,
+        val visitor = new SparderRexVisitor(plan,
           rel.getInput.getRowType,
           dataContext)
         (rex.accept(visitor), rex.isInstanceOf[RexInputRef])
@@ -62,6 +63,6 @@ object ProjectPlan extends LogEx {
         }
       })
 
-    df.select(selectedColumns: _*)
+    SparkOperation.project(selectedColumns, plan)
   }
 }
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
index 3f23ab26ac..fb7167f03e 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
@@ -23,7 +23,6 @@ import java.nio.charset.StandardCharsets
 import java.util.concurrent.atomic.AtomicLong
 import java.{lang, util}
 
-import io.kyligence.kap.secondstorage.SecondStorageUtil
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
 import org.apache.commons.io.IOUtils
 import org.apache.hadoop.fs.Path
@@ -50,6 +49,8 @@ import scala.collection.JavaConverters._
 import scala.collection.convert.ImplicitConversions.`iterator asScala`
 import scala.collection.mutable
 
+import io.kyligence.kap.secondstorage.SecondStorageUtil
+
 // scalastyle:off
 object ResultType extends Enumeration {
   type ResultType = Value
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/SortPlan.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/SortPlan.scala
index 9d8c87db2b..bab7c16b60 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/SortPlan.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/SortPlan.scala
@@ -17,27 +17,25 @@
  */
 package org.apache.kylin.query.runtime.plan
 
-import org.apache.kylin.engine.spark.utils.LogEx
 import org.apache.calcite.DataContext
+import org.apache.kylin.engine.spark.utils.LogEx
 import org.apache.kylin.query.relnode.KapSortRel
 import org.apache.kylin.query.runtime.SparderRexVisitor
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.KapFunctions._
+import org.apache.spark.sql.KapFunctions.k_lit
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, 
NullsFirst, NullsLast, SortOrder}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort}
 
 import scala.collection.JavaConverters._
 
-
 object SortPlan extends LogEx {
-  def sort(inputs: java.util.List[DataFrame],
+  def sort(plan: LogicalPlan,
            rel: KapSortRel,
-           dataContext: DataContext): DataFrame = logTime("sort", debug = 
true) {
-
-    val dataFrame = inputs.get(0)
+           dataContext: DataContext): LogicalPlan = logTime("sort", debug = 
true) {
 
     val columns = rel.getChildExps.asScala
       .map(rex => {
-        val visitor = new SparderRexVisitor(dataFrame,
-                                            rel.getInput.getRowType,
+        val visitor = new SparderRexVisitor(plan.output.map(_.name),
+          rel.getInput.getRowType,
           dataContext)
         rex.accept(visitor)
       })
@@ -47,33 +45,33 @@ object SortPlan extends LogEx {
         val collation = rel.collation.getFieldCollations.get(pair._2)
 
         /** From Calcite: 
org.apache.calcite.rel.RelFieldCollation.Direction#defaultNullDirection
-          * Returns the null direction if not specified. Consistent with 
Oracle,
-          * NULLS are sorted as if they were positive infinity. */
+         * Returns the null direction if not specified. Consistent with Oracle,
+         * NULLS are sorted as if they were positive infinity. */
         (collation.direction, collation.nullDirection) match {
           case (
-              org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING,
-              
org.apache.calcite.rel.RelFieldCollation.NullDirection.UNSPECIFIED) =>
-            pair._1.asc_nulls_last
+            org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING,
+            
org.apache.calcite.rel.RelFieldCollation.NullDirection.UNSPECIFIED) =>
+            SortOrder(pair._1.expr, Ascending, NullsLast, Seq.empty)
           case (org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING,
-                org.apache.calcite.rel.RelFieldCollation.NullDirection.LAST) =>
-            pair._1.asc_nulls_last
+          org.apache.calcite.rel.RelFieldCollation.NullDirection.LAST) =>
+            SortOrder(pair._1.expr, Ascending, NullsLast, Seq.empty)
           case (org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING,
-                org.apache.calcite.rel.RelFieldCollation.NullDirection.FIRST) 
=>
-            pair._1.asc_nulls_first
+          org.apache.calcite.rel.RelFieldCollation.NullDirection.FIRST) =>
+            SortOrder(pair._1.expr, Ascending, NullsFirst, Seq.empty)
           case (
-              org.apache.calcite.rel.RelFieldCollation.Direction.DESCENDING,
-              
org.apache.calcite.rel.RelFieldCollation.NullDirection.UNSPECIFIED) =>
-            pair._1.desc_nulls_first
+            org.apache.calcite.rel.RelFieldCollation.Direction.DESCENDING,
+            
org.apache.calcite.rel.RelFieldCollation.NullDirection.UNSPECIFIED) =>
+            SortOrder(pair._1.expr, Descending, NullsFirst, Seq.empty)
           case (org.apache.calcite.rel.RelFieldCollation.Direction.DESCENDING,
-                org.apache.calcite.rel.RelFieldCollation.NullDirection.LAST) =>
-            pair._1.desc_nulls_last
+          org.apache.calcite.rel.RelFieldCollation.NullDirection.LAST) =>
+            SortOrder(pair._1.expr, Descending, NullsLast, Seq.empty)
           case (org.apache.calcite.rel.RelFieldCollation.Direction.DESCENDING,
-                org.apache.calcite.rel.RelFieldCollation.NullDirection.FIRST) 
=>
-            pair._1.desc_nulls_first
+          org.apache.calcite.rel.RelFieldCollation.NullDirection.FIRST) =>
+            SortOrder(pair._1.expr, Descending, NullsFirst, Seq.empty)
           case _ => throw new IllegalArgumentException
         }
       })
 
-    inputs.get(0).sort(columns: _*)
+    Sort(columns, true, plan)
   }
 }
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala
index 5c0b0511b7..4a06499092 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala
@@ -33,12 +33,13 @@ import org.apache.kylin.metadata.tuple.TupleInfo
 import org.apache.kylin.query.implicits.sessionToQueryContext
 import org.apache.kylin.query.relnode.{KapRel, OLAPContext}
 import org.apache.kylin.query.util.{RuntimeHelper, SparderDerivedUtil}
-import org.apache.spark.sql.{Column, DataFrame, Row, SparderEnv, 
SparkOperation, SparkSession}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Union}
 import org.apache.spark.sql.execution.utils.SchemaProcessor
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.manager.SparderLookupManager
 import org.apache.spark.sql.types.{ArrayType, DataTypes, DoubleType, 
StringType, StructField, StructType}
 import org.apache.spark.sql.util.SparderTypeUtil
+import org.apache.spark.sql.{Column, DataFrame, Row, SparderEnv, 
SparkInternalAgent, SparkOperation, SparkSession}
 
 import scala.collection.JavaConverters._
 
@@ -52,20 +53,20 @@ object TableScanPlan extends LogEx {
     r
   }
 
-  private[runtime] val cacheDf: ThreadLocal[ConcurrentHashMap[String, 
DataFrame]] = new ThreadLocal[ConcurrentHashMap[String, DataFrame]] {
-    override def initialValue: ConcurrentHashMap[String, DataFrame] = {
-      new ConcurrentHashMap[String, DataFrame]
+  private[runtime] val cachePlan: ThreadLocal[ConcurrentHashMap[String, 
LogicalPlan]] = new ThreadLocal[ConcurrentHashMap[String, LogicalPlan]] {
+    override def initialValue: ConcurrentHashMap[String, LogicalPlan] = {
+      new ConcurrentHashMap[String, LogicalPlan]
     }
   }
 
-  def createOLAPTable(rel: KapRel): DataFrame = logTime("table scan", debug = 
true) {
+  def createOLAPTable(rel: KapRel): LogicalPlan = logTime("table scan", debug 
= true) {
     val session: SparkSession = SparderEnv.getSparkSession
     val olapContext = rel.getContext
     val context = olapContext.storageContext
     val prunedSegments = context.getPrunedSegments
     val prunedStreamingSegments = context.getPrunedStreamingSegments
     val realizations = olapContext.realization.getRealizations.asScala.toList
-    realizations.map(_.asInstanceOf[NDataflow])
+    val plans = realizations.map(_.asInstanceOf[NDataflow])
       .filter(dataflow => (!dataflow.isStreaming && 
!context.isBatchCandidateEmpty) ||
         (dataflow.isStreaming && !context.isStreamCandidateEmpty) ||
         isSegmentsEmpty(prunedSegments, prunedStreamingSegments))
@@ -75,10 +76,14 @@ object TableScanPlan extends LogEx {
         } else {
           tableScan(rel, dataflow, olapContext, session, prunedSegments, 
context.getCandidate)
         }
-      }).reduce(_.union(_))
+      })
+
+    // The reason why we use Project to package it here is because the output 
method of Union needs to be analyzed,
+    // so it needs to be packaged by Project so that subsequent operators can 
easily obtain the output information of nodes.
+    Project(plans.head.output, Union(plans))
   }
 
-  def createMetadataTable(rel: KapRel): DataFrame = {
+  def createMetadataTable(rel: KapRel): LogicalPlan = {
     val session: SparkSession = SparderEnv.getSparkSession
     val olapContext = rel.getContext
     val allFields: util.List[TblColRef] = new util.ArrayList[TblColRef]
@@ -122,13 +127,14 @@ object TableScanPlan extends LogEx {
     })
     val schema: StructType = StructType(structTypes)
 
-    session.createDataFrame(result, schema)
+    session.createDataFrame(result, schema).queryExecution.logical
   }
 
   // prunedSegments is null
-  private def tableScanEmptySegment(rel: KapRel): DataFrame = {
+  private def tableScanEmptySegment(rel: KapRel): LogicalPlan = {
     logInfo("prunedSegments is null")
-    val df = SparkOperation.createEmptyDataFrame(
+    // KE-41874 DataFrame convert Logical Plan
+    SparkOperation.createEmptyDataFrame(
       StructType(rel.getColumnRowType
         .getAllColumns.asScala
         .map(column => StructField(
@@ -136,11 +142,7 @@ object TableScanPlan extends LogEx {
           SparderTypeUtil.toSparkType(column.getType))
         )
       )
-    )
-    val cols = df.schema.map(structField => {
-      col(structField.name)
-    })
-    df.select(cols: _*)
+    ).queryExecution.logical
   }
 
   def isSegmentsEmpty(prunedSegments: util.List[NDataSegment], 
prunedStreamingSegments: util.List[NDataSegment]): Boolean = {
@@ -151,7 +153,7 @@ object TableScanPlan extends LogEx {
 
   def tableScan(rel: KapRel, dataflow: NDataflow, olapContext: OLAPContext,
                 session: SparkSession, prunedSegments: util.List[NDataSegment],
-                candidate: NLayoutCandidate): DataFrame = {
+                candidate: NLayoutCandidate): LogicalPlan = {
     val prunedPartitionMap = olapContext.storageContext.getPrunedPartitions
     olapContext.resetSQLDigest()
     //TODO: refactor
@@ -175,31 +177,31 @@ object TableScanPlan extends LogEx {
     val path = fileList.mkString(",") + olapContext.isExactlyFastBitmap
     printLogInfo(basePath, dataflow.getId, cuboidLayout.getId, prunedSegments, 
prunedPartitionMap)
 
-    val cached = cacheDf.get().getOrDefault(path, null)
-    val df = if (cached != null && 
!cached.sparkSession.sparkContext.isStopped) {
-      logInfo(s"Reuse df: ${cuboidLayout.getId}")
+    val pruningInfo = prunedSegments.asScala.map { seg =>
+      if (prunedPartitionMap != null) {
+        val partitions = prunedPartitionMap.get(seg.getId)
+        seg.getId + ":" + Joiner.on("|").join(partitions)
+      } else {
+        seg.getId
+      }
+    }.mkString(",")
+
+    val cached = cachePlan.get().getOrDefault(path, null)
+    var plan = if (cached != null && 
!SparderEnv.getSparkSession.sparkContext.isStopped) {
+      logInfo(s"Reuse plan: ${cuboidLayout.getId}")
       cached
     } else {
-      val pruningInfo = prunedSegments.asScala.map { seg =>
-        if (prunedPartitionMap != null) {
-          val partitions = prunedPartitionMap.get(seg.getId)
-          seg.getId + ":" + Joiner.on("|").join(partitions)
-        } else {
-          seg.getId
-        }
-      }.mkString(",")
-      val newDf = session.kylin
+      val newPlan = session.kylin
         .isFastBitmapEnabled(olapContext.isExactlyFastBitmap)
         .bucketingEnabled(bucketEnabled(olapContext, cuboidLayout))
         .cuboidTable(dataflow, cuboidLayout, pruningInfo)
-        .toDF(columnNames: _*)
-      logInfo(s"Cache df: ${cuboidLayout.getId}")
-      cacheDf.get().put(path, newDf)
-      newDf
+      cachePlan.get().put(path, newPlan)
+      newPlan
     }
 
-    val (schema, newDF) = buildSchema(df, tableName, cuboidLayout, rel, 
olapContext, dataflow)
-    newDF.select(schema: _*)
+    plan = SparkOperation.projectAsAlias(columnNames, plan)
+    val (schema, newPlan) = buildSchema(plan, tableName, cuboidLayout, rel, 
olapContext, dataflow)
+    SparkOperation.project(schema, newPlan)
   }
 
   def bucketEnabled(context: OLAPContext, layout: LayoutEntity): Boolean = {
@@ -217,9 +219,9 @@ object TableScanPlan extends LogEx {
       && context.getOuterJoinParticipants.iterator().next() == 
layout.getShardByColumnRefs.get(0))
   }
 
-  def buildSchema(df: DataFrame, tableName: String, cuboidLayout: 
LayoutEntity, rel: KapRel,
-                  olapContext: OLAPContext, dataflow: NDataflow): 
(Seq[Column], DataFrame) = {
-    var newDF = df
+  def buildSchema(plan: LogicalPlan, tableName: String, cuboidLayout: 
LayoutEntity, rel: KapRel,
+                  olapContext: OLAPContext, dataflow: NDataflow): 
(Seq[Column], LogicalPlan) = {
+    var newPlan = plan
     val isBatchOfHybrid = 
olapContext.realization.isInstanceOf[HybridRealization] && 
dataflow.getModel.isFusionModel && !dataflow.isStreaming
     val mapping = new NLayoutToGridTableMapping(cuboidLayout, isBatchOfHybrid)
     val context = olapContext.storageContext
@@ -257,7 +259,7 @@ object TableScanPlan extends LogEx {
       olapContext.returnTupleInfo,
       context.getCandidate)
     if (derived.hasDerived) {
-      newDF = derived.joinDerived(newDF)
+      newPlan = derived.joinDerived(newPlan)
     }
     var topNMapping: Map[Int, Column] = Map.empty
     // query will only has one Top N measure.
@@ -266,8 +268,10 @@ object TableScanPlan extends LogEx {
     }
     if (topNMetric.isDefined) {
       val topNFieldIndex = 
mapping.getMetricsIndices(List(topNMetric.get).asJava).head
-      val tp = processTopN(topNMetric.get, newDF, topNFieldIndex, 
olapContext.returnTupleInfo, tableName)
-      newDF = tp._1
+
+      val df = SparkInternalAgent.getDataFrame(SparderEnv.getSparkSession, 
newPlan)
+      val tp = processTopN(topNMetric.get, df, topNFieldIndex, 
olapContext.returnTupleInfo, tableName)
+      newPlan = tp._1.queryExecution.analyzed
       topNMapping = tp._2
     }
     val tupleIdx = getTupleIdx(dimensionsD,
@@ -278,10 +282,9 @@ object TableScanPlan extends LogEx {
       derived,
       tableName,
       rel.getColumnRowType.getAllColumns.asScala.toList,
-      df.schema,
-      gtColIdx,
-      tupleIdx,
-      topNMapping), newDF)
+      newPlan,
+      (gtColIdx, tupleIdx),
+      topNMapping), newPlan)
   }
 
   def toLayoutPath(dataflow: NDataflow, cuboidId: Long, basePath: String, seg: 
NDataSegment): String = {
@@ -313,7 +316,7 @@ object TableScanPlan extends LogEx {
       }.mkString(",")
       logInfo(s"""Path is: 
{"base":"$basePath","dataflow":"${dataflowId}","segments":{$prunedSegmentInfo},"layout":
 ${cuboidId}""")
     }
-    logInfo(s"size is ${cacheDf.get().size()}")
+    logInfo(s"size is ${cachePlan.get().size()}")
   }
 
   private def processTopN(topNMetric: FunctionDesc, df: DataFrame, 
topNFieldIndex: Int, tupleInfo: TupleInfo, tableName: String): (DataFrame, 
Map[Int, Column]) = {
@@ -433,7 +436,7 @@ object TableScanPlan extends LogEx {
     tupleIdx
   }
 
-  def createLookupTable(rel: KapRel): DataFrame = {
+  def createLookupTable(rel: KapRel): LogicalPlan = {
     val start = System.currentTimeMillis()
 
     val olapContext = rel.getContext
@@ -447,17 +450,18 @@ object TableScanPlan extends LogEx {
     val snapshotResPath = 
tableMetadataManager.getTableDesc(lookupTableName).getLastSnapshotPath
     val config = instance.getConfig
     val dataFrameTableName = instance.getProject + "@" + lookupTableName
-    val lookupDf = SparderLookupManager.getOrCreate(dataFrameTableName, 
snapshotResPath, config)
+    val lookupPlan = SparderLookupManager.getOrCreate(dataFrameTableName, 
snapshotResPath, config)
 
     val olapTable = olapContext.firstTableScan.getOlapTable
     val alisTableName = olapContext.firstTableScan.getBackupAlias
-    val newNames = lookupDf.schema.fieldNames.map { name =>
-      val gTInfoSchema = SchemaProcessor.parseDeriveTableSchemaName(name)
-      SchemaProcessor.generateDeriveTableSchemaName(alisTableName,
+    val newNames = lookupPlan.output.map { c =>
+      val gTInfoSchema = SchemaProcessor.parseDeriveTableSchemaName(c.name)
+      val name = SchemaProcessor.generateDeriveTableSchemaName(alisTableName,
         gTInfoSchema.columnId,
         gTInfoSchema.columnName)
-    }.array
-    val newNameLookupDf = lookupDf.toDF(newNames: _*)
+      name
+    }
+    val newNameLookupPlan = SparkOperation.projectAsAlias(newNames, lookupPlan)
     val colIndex = olapTable.getSourceColumns.asScala
       .map(
         column =>
@@ -473,9 +477,9 @@ object TableScanPlan extends LogEx {
                 )
             )
           })
-    val df = newNameLookupDf.select(colIndex: _*)
+    val plan = SparkOperation.project(colIndex, newNameLookupPlan)
     logInfo(s"Gen lookup table scan cost Time :${System.currentTimeMillis() - 
start} ")
-    df
+    plan
   }
 
   private def expandDerived(layoutCandidate: NLayoutCandidate,
@@ -498,10 +502,10 @@ object TableScanPlan extends LogEx {
     expanded
   }
 
-  def createSingleRow(): DataFrame = {
+  def createSingleRow(): LogicalPlan = {
     val session = SparderEnv.getSparkSession
     val rows = List.fill(1)(Row.fromSeq(List[Object]()))
     val rdd = session.sparkContext.makeRDD(rows)
-    session.createDataFrame(rdd, StructType(List[StructField]()))
+    session.createDataFrame(rdd, 
StructType(List[StructField]())).queryExecution.logical
   }
 }
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/UnionPlan.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/UnionPlan.scala
index 4530c42a26..eaac2027f5 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/UnionPlan.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/UnionPlan.scala
@@ -19,27 +19,21 @@ package org.apache.kylin.query.runtime.plan
 
 import org.apache.calcite.DataContext
 import org.apache.kylin.query.relnode.KapUnionRel
-import org.apache.spark.sql.DataFrame
-
-import scala.collection.JavaConverters._
+import org.apache.spark.sql.SparkOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Deduplicate, LogicalPlan, 
Union}
+import org.apache.spark.sql.functions.col
 
 object UnionPlan {
-
-  def union(
-      inputs: java.util.List[DataFrame],
-      rel: KapUnionRel,
-      dataContext: DataContext): DataFrame = {
-    var df = inputs.get(0)
-    val drop = inputs.asScala.drop(1)
-    if (rel.all) {
-      for (other <- drop) {
-        df = df.union(other)
-      }
+  def union(plans: Seq[LogicalPlan],
+            rel: KapUnionRel,
+            dataContext: DataContext): LogicalPlan = {
+    val unionPlan = if (rel.all) {
+      Union(plans)
     } else {
-      for (other <- drop) {
-        df = df.union(other).distinct()
-      }
+      val uPlan = Union(plans)
+      Deduplicate(plans.head.output, uPlan)
     }
-    df
+
+    SparkOperation.project(plans.head.output.map(c => col(c.name)), unionPlan)
   }
 }
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ValuesPlan.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ValuesPlan.scala
index ef9269a88a..53d9a6c8f2 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ValuesPlan.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ValuesPlan.scala
@@ -18,27 +18,28 @@
 package org.apache.kylin.query.runtime.plan
 
 import org.apache.kylin.query.relnode.KapValuesRel
-import org.apache.spark.sql.{DataFrame, Row, SparkOperation}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.types.{StructField, StructType}
 import org.apache.spark.sql.util.SparderTypeUtil
 
 import scala.collection.JavaConverters._
 
 object ValuesPlan {
-  def values(rel: KapValuesRel): DataFrame = {
+  def values(rel: KapValuesRel): LogicalPlan = {
 
     val schema = StructType(rel.getRowType.getFieldList.asScala.map { field =>
       StructField(
         field.getName,
         SparderTypeUtil.convertSqlTypeToSparkType(field.getType))
     })
+    val output = schema.map(f => AttributeReference(f.name, f.dataType, 
f.nullable, f.metadata)())
+
     val rows = rel.tuples.asScala.map { tp =>
       Row.fromSeq(tp.asScala.map(lit => 
SparderTypeUtil.getValueFromRexLit(lit)))
-    }.asJava
-    if (rel.tuples.size() == 0) {
-      SparkOperation.createEmptyDataFrame(schema)
-    } else {
-      SparkOperation.createConstantDataFrame(rows, schema)
     }
+
+    LocalRelation.fromExternalRows(output, rows)
   }
 }
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/WindowPlan.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/WindowPlan.scala
index 7a7fcb46b8..38581a4963 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/WindowPlan.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/WindowPlan.scala
@@ -19,27 +19,29 @@ package org.apache.kylin.query.runtime.plan
 
 import java.sql.Date
 import java.util.{Calendar, Locale}
+
 import org.apache.calcite.DataContext
 import org.apache.calcite.rel.RelCollationImpl
 import org.apache.calcite.rel.RelFieldCollation.Direction
 import org.apache.calcite.rex.RexInputRef
 import org.apache.calcite.util.NlsString
 import org.apache.kylin.common.util.DateFormat
+import org.apache.kylin.engine.spark.utils.LogEx
 import org.apache.kylin.query.relnode.{KapProjectRel, KapWindowRel}
 import org.apache.kylin.query.runtime.SparderRexVisitor
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.KapFunctions._
+import org.apache.spark.sql.KapFunctions.k_lit
 import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.expressions.{Window, WindowSpec}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.LongType
 import org.apache.spark.sql.util.SparderTypeUtil
-import org.apache.spark.sql.{Column, DataFrame}
+import org.apache.spark.sql.{Column, SparkOperation}
 
 import scala.collection.JavaConverters._
 
-object WindowPlan extends Logging {
+object WindowPlan extends LogEx {
   // the function must have sort
   val sortSpecified =
     List("CUME_DIST", "LEAD", "RANK", "DENSE_RANK", "ROW_NUMBER", "NTILE", 
"LAG")
@@ -54,23 +56,23 @@ object WindowPlan extends Logging {
     "LEAD"
   )
 
-  def window(input: java.util.List[DataFrame],
-             rel: KapWindowRel, datacontex: DataContext): DataFrame = {
+  def window(plan: LogicalPlan,
+             rel: KapWindowRel,
+             datacontex: DataContext): LogicalPlan = {
     val start = System.currentTimeMillis()
 
     var windowCount = 0
     rel.groups.asScala.head.upperBound
-    val df = input.get(0)
-    val columnSize = df.schema.length
+    val columnSize = plan.output.size
 
-    val columns = df.schema.fieldNames.map(col)
+    val columns = plan.output.map(c => col(c.name))
     val constantMap = rel.getConstants.asScala
       .map(_.getValue)
       .zipWithIndex
       .map { entry =>
         (entry._2 + columnSize, entry._1)
       }.toMap[Int, Any]
-    val visitor = new SparderRexVisitor(df,
+    val visitor = new SparderRexVisitor(plan,
       rel.getInput.getRowType,
       datacontex)
     val constants = rel.getConstants.asScala
@@ -238,9 +240,9 @@ object WindowPlan extends Logging {
 
       }
     val selectColumn = columns ++ windows
-    val window = df.select(selectColumn: _*)
+    val windowPlan = SparkOperation.project(selectColumn, plan)
     logInfo(s"Gen window cost Time :${System.currentTimeMillis() - start} ")
-    window
+    windowPlan
   }
 
   // scalastyle:off
@@ -259,7 +261,7 @@ object WindowPlan extends Logging {
               col.expr.prettyName.toUpperCase(Locale.ROOT) match {
                 case "CURRENT_DATE" =>
                   DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN)
-                      .format(DateTimeUtils.currentTimestamp() / 1000)
+                    .format(DateTimeUtils.currentTimestamp() / 1000)
                 case _ => col.expr
               }
             }
@@ -271,6 +273,7 @@ object WindowPlan extends Logging {
       }
     }
   }
+
   def constantValue(value: Any) = {
     value match {
       case v: NlsString => v.getValue
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/util/RuntimeHelper.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/util/RuntimeHelper.scala
index 2109b0ecdb..50fd84bb7f 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/util/RuntimeHelper.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/util/RuntimeHelper.scala
@@ -26,9 +26,10 @@ import org.apache.kylin.metadata.project.NProjectManager
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.Column
 import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.utils.SchemaProcessor
 import org.apache.spark.sql.functions._
-import org.apache.spark.sql.types.{ArrayType, DataTypes, StructType}
+import org.apache.spark.sql.types.{ArrayType, DataTypes}
 import org.apache.spark.sql.udf.UdfManager
 
 import scala.collection.JavaConverters._
@@ -51,16 +52,17 @@ object RuntimeHelper extends Logging {
     name
   }
 
-  def gtSchemaToCalciteSchema(
-                               primaryKey: ImmutableBitSet,
-                               derivedUtil: SparderDerivedUtil,
-                               factTableName: String,
-                               allColumns: List[TblColRef],
-                               sourceSchema: StructType,
-                               gtColIdx: Array[Int],
-                               tupleIdx: Array[Int],
-                               topNMapping: Map[Int, Column]
-                             ): Seq[Column] = {
+  // S53 Fix https://olapio.atlassian.net/browse/KE-42473
+  def gtSchemaToCalciteSchema(primaryKey: ImmutableBitSet,
+                              derivedUtil: SparderDerivedUtil,
+                              factTableName: String,
+                              allColumns: List[TblColRef],
+                              plan: LogicalPlan,
+                              colIdx: (Array[Int], Array[Int]),
+                              topNMapping: Map[Int, Column]): Seq[Column] = {
+    val gtColIdx = colIdx._1
+    val tupleIdx = colIdx._2
+    val sourceSchema = plan.output
     val gTInfoNames = SchemaProcessor.buildFactTableSortNames(sourceSchema)
     val calciteToGTinfo = tupleIdx.zipWithIndex.toMap
     var deriveMap: Map[Int, Column] = Map.empty
@@ -98,7 +100,12 @@ object RuntimeHelper extends Logging {
 
     val projectConfig = 
NProjectManager.getProjectConfig(derivedUtil.model.getProject)
     // may have multi TopN measures.
-    val topNIndexs = 
sourceSchema.fields.map(_.dataType).zipWithIndex.filter(_._1.isInstanceOf[ArrayType])
+    val topNIndexs = if (plan.resolved) {
+      
sourceSchema.map(_.dataType).zipWithIndex.filter(_._1.isInstanceOf[ArrayType])
+    } else {
+      Seq.empty
+    }
+
     allColumns.indices
       .zip(allColumns)
       .map {
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/util/SparderDerivedUtil.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/util/SparderDerivedUtil.scala
index 67834c7d29..a8b38dccb3 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/util/SparderDerivedUtil.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/util/SparderDerivedUtil.scala
@@ -18,23 +18,25 @@
 
 package org.apache.kylin.query.util
 
+import java.util
+
+import org.apache.calcite.sql.SqlKind
 import org.apache.kylin.guava30.shaded.common.collect.Maps
 import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate
 import org.apache.kylin.metadata.cube.model.NDataSegment
-import org.apache.kylin.metadata.model.{NDataModel, NTableMetadataManager}
-import org.apache.kylin.metadata.model.util.scd2.SCD2NonEquiCondSimplification
-import org.apache.calcite.sql.SqlKind
 import org.apache.kylin.metadata.model.DeriveInfo.DeriveType
 import 
org.apache.kylin.metadata.model.NonEquiJoinCondition.SimplifiedNonEquiJoinCondition
-import org.apache.kylin.metadata.model.{DeriveInfo, TblColRef}
+import org.apache.kylin.metadata.model.util.scd2.SCD2NonEquiCondSimplification
+import org.apache.kylin.metadata.model.{DeriveInfo, NDataModel, 
NTableMetadataManager, TblColRef}
 import org.apache.kylin.metadata.tuple.TupleInfo
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.plans.logical.{Join, JoinHint, 
LogicalPlan}
 import org.apache.spark.sql.derived.DerivedInfo
 import org.apache.spark.sql.execution.utils.{DeriveTableColumnInfo, 
SchemaProcessor}
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.manager.SparderLookupManager
-import org.apache.spark.sql.{Column, DataFrame}
+import org.apache.spark.sql.{Column, SparkOperation}
 
-import java.util
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
@@ -62,8 +64,8 @@ case class SparderDerivedUtil(gtInfoTableName: String,
     .reverse
     .foreach(entry => findDerivedColumn(entry._1, entry._2))
 
-  val derivedColumnNameMapping: util.HashMap[DerivedInfo, Array[String]] =
-    Maps.newHashMap[DerivedInfo, Array[String]]()
+  val derivedColumnNameMapping: util.HashMap[DerivedInfo, Seq[String]] =
+    Maps.newHashMap[DerivedInfo, Seq[String]]()
 
   def findDerivedColumn(hostColIds: util.List[Integer], deriveInfo: 
DeriveInfo): Unit = {
 
@@ -140,23 +142,23 @@ case class SparderDerivedUtil(gtInfoTableName: String,
     cuboidIdx
   }
 
-  def joinDerived(dataFrame: DataFrame): DataFrame = {
-    var joinedDf: DataFrame = dataFrame
+  def joinDerived(plan: LogicalPlan): LogicalPlan = {
+    var joinedPlan: LogicalPlan = plan
     val joinedLookups = scala.collection.mutable.Set[String]()
 
     for (hostToDerived <- hostToDeriveds) {
       if (hostToDerived.deriveType != DeriveType.PK_FK) {
         //  PK_FK derive does not need joining
         if (!joinedLookups.contains(hostToDerived.aliasTableName)) {
-          joinedDf = joinLookUpTable(dataFrame.schema.fieldNames,
-            joinedDf,
+          joinedPlan = joinLookUpTable(plan.output.map(c => c.name).toArray,
+            joinedPlan,
             hostToDerived)
           joinedLookups.add(hostToDerived.aliasTableName)
         }
       }
     }
 
-    joinedDf
+    joinedPlan
   }
 
   def getLookupTablePathAndPkIndex(deriveInfo: DeriveInfo): (String, String, 
String, Array[Int]) = {
@@ -179,28 +181,25 @@ case class SparderDerivedUtil(gtInfoTableName: String,
   }
 
   def joinLookUpTable(gTInfoNames: Array[String],
-                      df: DataFrame,
-                      derivedInfo: DerivedInfo): DataFrame = {
+                      plan: LogicalPlan,
+                      derivedInfo: DerivedInfo): LogicalPlan = {
     val lookupTableAlias = derivedInfo.aliasTableName
 
-    val lookupDf =
+    val lookupPlan =
       SparderLookupManager.getOrCreate(derivedInfo.tableIdentity,
         derivedInfo.path,
         dataSeg.getConfig)
 
-    val newNames = lookupDf.schema.fieldNames
-      .map { name =>
-        SchemaProcessor.parseDeriveTableSchemaName(name)
-      }
+    val newNames = lookupPlan.output
+      .map(c => SchemaProcessor.parseDeriveTableSchemaName(c.name))
       .sortBy(_.columnId)
       .map(
         deriveInfo =>
           DeriveTableColumnInfo(lookupTableAlias,
             deriveInfo.columnId,
             deriveInfo.columnName).toString)
-      .array
     derivedColumnNameMapping.put(derivedInfo, newNames)
-    val newNameLookupDf = lookupDf.toDF(newNames: _*)
+    val newNameLookupPlan = SparkOperation.projectAsAlias(newNames, lookupPlan)
     if (derivedInfo.fkIdx.length != derivedInfo.pkIdx.length) {
       throw new IllegalStateException(
         s"unequal host key num ${derivedInfo.fkIdx.length} " +
@@ -224,13 +223,12 @@ case class SparderDerivedUtil(gtInfoTableName: String,
       val simplifiedCond = SCD2NonEquiCondSimplification.INSTANCE
         .convertToSimplifiedSCD2Cond(derivedInfo.join)
         .getSimplifiedNonEquiJoinConditions
-      joinCol = genNonEquiJoinColumn(newNameLookupDf, gTInfoNames, joinCol, 
simplifiedCond.asScala)
+      joinCol = genNonEquiJoinColumn(newNameLookupPlan, gTInfoNames, joinCol, 
simplifiedCond.asScala)
     }
-    df.join(newNameLookupDf, joinCol, derivedInfo.join.getType)
-
+    Join(plan, newNameLookupPlan, JoinType.apply(derivedInfo.join.getType), 
Option(joinCol.expr), JoinHint.NONE)
   }
 
-  def genNonEquiJoinColumn(newNameLookupDf: DataFrame,
+  def genNonEquiJoinColumn(newNameLookupPlan: LogicalPlan,
                            gTInfoNames: Array[String],
                            colOrigin: Column,
                            simplifiedConds: 
mutable.Buffer[SimplifiedNonEquiJoinCondition]): Column = {
@@ -238,7 +236,7 @@ case class SparderDerivedUtil(gtInfoTableName: String,
     var joinCol = colOrigin
     for (simplifiedCond <- simplifiedConds) {
       val colFk = 
col(gTInfoNames.apply(indexOnTheCuboidValues(simplifiedCond.getFk)))
-      val colPk = 
col(newNameLookupDf.schema.fieldNames.apply(simplifiedCond.getPk.getColumnDesc.getZeroBasedIndex))
+      val colPk = 
col(newNameLookupPlan.output.apply(simplifiedCond.getPk.getColumnDesc.getZeroBasedIndex).name)
       val colOp = simplifiedCond.getOp
 
       val newCol = colOp match {
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala
index db5e9b41ae..8b393c2d63 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala
@@ -23,12 +23,15 @@ import java.sql.Timestamp
 import org.apache.kylin.common.KylinConfig
 import org.apache.kylin.metadata.cube.model.{LayoutEntity, NDataflow, 
NDataflowManager}
 import org.apache.kylin.metadata.model.FusionModelManager
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
 import org.apache.spark.sql.datasource.storage.StorageStoreFactory
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.types.StructType
 
 import scala.collection.mutable.{HashMap => MutableHashMap}
 
+import io.kyligence.kap.secondstorage.SecondStorage
+
 class KylinDataFrameManager(sparkSession: SparkSession) {
   private var extraOptions = new MutableHashMap[String, String]()
   private var userSpecifiedSchema: Option[StructType] = None
@@ -70,7 +73,7 @@ class KylinDataFrameManager(sparkSession: SparkSession) {
     option("bucketingEnabled", bucketingEnabled)
   }
 
-  def cuboidTable(dataflow: NDataflow, layout: LayoutEntity, pruningInfo: 
String): DataFrame = {
+  def cuboidTable(dataflow: NDataflow, layout: LayoutEntity, pruningInfo: 
String): LogicalPlan = {
     format("parquet")
     option("project", dataflow.getProject)
     option("dataflowId", dataflow.getUuid)
@@ -85,18 +88,24 @@ class KylinDataFrameManager(sparkSession: SparkSession) {
 
       val partition = 
dataflow.getModel.getPartitionDesc.getPartitionDateColumnRef
       val id = layout.getOrderedDimensions.inverse().get(partition)
-      var df = read(dataflow, layout, pruningInfo)
+      var plan = read(dataflow, layout, pruningInfo)
       if (id != null && end != Long.MinValue) {
-        df = df.filter(col(id.toString).geq(new Timestamp(end)))
+        val filterPlan = Filter(col(id.toString).geq(new Timestamp(end)).expr, 
plan)
+        plan = SparkOperation.project(filterPlan.output.map(c => col(c.name)), 
filterPlan)
       }
-      return df
+      return plan
     }
     read(dataflow, layout, pruningInfo)
   }
 
-  def read(dataflow: NDataflow, layout: LayoutEntity, pruningInfo: String): 
DataFrame = {
+  def read(dataflow: NDataflow, layout: LayoutEntity, pruningInfo: String): 
LogicalPlan = {
+    val df = SecondStorage.trySecondStorage(sparkSession, dataflow, layout, 
pruningInfo)
+    if (df.isEmpty) {
       StorageStoreFactory.create(dataflow.getModel.getStorageType)
         .read(dataflow, layout, sparkSession, extraOptions.toMap)
+    } else {
+      df.get.queryExecution.analyzed
+    }
   }
 
   /**
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparkOperation.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparkOperation.scala
index 1865f7ab79..1b8cbb9a7d 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparkOperation.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparkOperation.scala
@@ -20,8 +20,8 @@ package org.apache.spark.sql
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GroupingSets
-import org.apache.spark.sql.catalyst.plans.logical.Aggregate
-import org.apache.spark.sql.functions.{count, lit}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, 
LogicalPlan, Project}
+import org.apache.spark.sql.functions.{col, count, lit}
 import org.apache.spark.sql.types.StructType
 
 object SparkOperation {
@@ -61,27 +61,51 @@ object SparkOperation {
     SparderEnv.getSparkSession.createDataFrame(rows, structType)
   }
 
-  def agg(aggArgc: AggArgc): DataFrame = {
+  def project(cols: Seq[Column], plan: LogicalPlan): LogicalPlan = {
+    Project(cols.map(_.named), plan)
+  }
+
+  def projectAsAlias(newNames: Seq[String], plan: LogicalPlan): LogicalPlan = {
+    val newCols = plan.output.zip(newNames).map { case (oldAttribute, newName) 
=>
+      col(oldAttribute.name).as(newName)
+    }
+    project(newCols, plan)
+  }
+
+  def filter(condition: Column, plan: LogicalPlan): LogicalPlan = {
+    Filter(condition.named, plan)
+  }
+
+  def agg(aggArgc: AggArgc): LogicalPlan = {
     if (aggArgc.agg.nonEmpty && aggArgc.group.nonEmpty && 
!aggArgc.isSimpleGroup && aggArgc.groupSets.nonEmpty) {
-      Dataset.ofRows(
-        aggArgc.dataFrame.sparkSession,
-        Aggregate(
-          Seq(GroupingSets(aggArgc.groupSets.map(gs => gs.map(_.expr)),
-            aggArgc.group.map(_.expr))),
-          aggArgc.group.map(_.named) ++ aggArgc.agg.map(_.named),
-          aggArgc.dataFrame.queryExecution.logical
-        )
+      Aggregate(
+        Seq(GroupingSets(aggArgc.groupSets.map(gs => gs.map(_.expr)),
+          aggArgc.group.map(_.expr))),
+        aggArgc.group.map(_.named) ++ aggArgc.agg.map(_.named),
+        aggArgc.plan
       )
     } else if (aggArgc.agg.nonEmpty && aggArgc.group.nonEmpty) {
-      aggArgc.dataFrame
-        .groupBy(aggArgc.group: _*)
-        .agg(aggArgc.agg.head, aggArgc.agg.drop(1): _*)
+      Aggregate(
+        aggArgc.group.map(_.expr),
+        (aggArgc.group ++ aggArgc.agg).map(_.named),
+        aggArgc.plan
+      )
     } else if (aggArgc.agg.isEmpty && aggArgc.group.nonEmpty) {
-      aggArgc.dataFrame.groupBy(aggArgc.group: 
_*).agg(count(lit("1"))).select(aggArgc.group: _*)
+      val aggPlan = Aggregate(
+        aggArgc.group.map(_.expr),
+        (aggArgc.group ++ Seq.apply(count(lit("1")))).map(_.named),
+        aggArgc.plan
+      )
+
+      Project(aggArgc.group.map(_.named), aggPlan)
     } else if (aggArgc.agg.nonEmpty && aggArgc.group.isEmpty) {
-      aggArgc.dataFrame.agg(aggArgc.agg.head, aggArgc.agg.drop(1): _*)
+      Aggregate(
+        Nil,
+        aggArgc.agg.map(_.named),
+        aggArgc.plan
+      )
     } else {
-      aggArgc.dataFrame
+      aggArgc.plan
     }
   }
 
@@ -102,4 +126,8 @@ object SparkOperation {
   */
 }
 
-case class AggArgc(dataFrame: DataFrame, group: List[Column], agg: 
List[Column], groupSets: List[List[Column]], isSimpleGroup: Boolean)
\ No newline at end of file
+case class AggArgc(plan: LogicalPlan,
+                   group: List[Column],
+                   agg: List[Column],
+                   groupSets: List[List[Column]],
+                   isSimpleGroup: Boolean)
\ No newline at end of file
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/execution/utils/SchemaProcessor.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/execution/utils/SchemaProcessor.scala
index caec2e369a..161850016c 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/execution/utils/SchemaProcessor.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/execution/utils/SchemaProcessor.scala
@@ -17,18 +17,19 @@
  */
 package org.apache.spark.sql.execution.utils
 
+import java.util
+
 import org.apache.kylin.common.util.ImmutableBitSet
 import org.apache.kylin.common.{KapConfig, KylinConfig}
-
-import java.util
 import org.apache.kylin.metadata.cube.gridtable.NLayoutToGridTableMapping
 import org.apache.kylin.metadata.cube.model.{LayoutEntity, NDataSegment, 
NDataflow, NDataflowManager}
-import org.apache.kylin.query.runtime.plan.TableScanPlan
 import org.apache.kylin.metadata.model.{ColumnDesc, FunctionDesc}
-import org.apache.spark.sql.{LayoutEntityConverter, SparkSession}
+import org.apache.kylin.query.runtime.plan.TableScanPlan
+import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SparderConstants.COLUMN_NAME_SEPARATOR
 import org.apache.spark.sql.util.{SparderConstants, SparderTypeUtil}
+import org.apache.spark.sql.{LayoutEntityConverter, SparkSession}
 
 import scala.collection.JavaConverters._
 
@@ -37,7 +38,7 @@ object SchemaProcessor {
 
   def buildGTSchema(cuboid: LayoutEntity,
                     mapping: NLayoutToGridTableMapping,
-                    tableName: String):Seq[String] = {
+                    tableName: String): Seq[String] = {
 
     genColumnNames(tableName, cuboid, mapping)
   }
@@ -79,7 +80,7 @@ object SchemaProcessor {
           SparderTypeUtil.toSparkType(function.getReturnDataType, true)
         }
       case "COUNT" => LongType
-      case x if x.startsWith("TOP_N")  =>
+      case x if x.startsWith("TOP_N") =>
         val fields = function.getParameters.asScala.drop(1).map(p =>
           StructField(s"DIMENSION_${p.getColRef.getName}", 
SparderTypeUtil.toSparkType(p.getColRef.getType))
         )
@@ -97,7 +98,7 @@ object SchemaProcessor {
       case "COLLECT_SET" =>
         val parameter = function.getParameters.get(0)
         ArrayType(SparderTypeUtil.toSparkType(parameter.getColRef.getType))
-      case _ =>  SparderTypeUtil.toSparkType(function.getReturnDataType)
+      case _ => SparderTypeUtil.toSparkType(function.getReturnDataType)
     }
   }
 
@@ -113,7 +114,7 @@ object SchemaProcessor {
       val path: String = TableScanPlan.toLayoutPath(df, nCuboidLayout.getId, 
base, latestReadySegment)
       val schema: StructType = sparkSession.read.parquet(path).schema
       val schemaFromNCuboidLayout: StructType = 
LayoutEntityConverter.genCuboidSchemaFromNCuboidLayout(nCuboidLayout)
-      if (!(schema == 
StructType.removeMetadata("__CHAR_VARCHAR_TYPE_STRING",schemaFromNCuboidLayout)))
 {
+      if (!(schema == StructType.removeMetadata("__CHAR_VARCHAR_TYPE_STRING", 
schemaFromNCuboidLayout))) {
         throw new RuntimeException(s"Check schema failed : dfName: $dfName, 
layoutId: ${nCuboidLayout.getId}, actual: 
${schemaFromNCuboidLayout.treeString}, expect: ${schema.treeString}")
       }
     }
@@ -147,12 +148,11 @@ object SchemaProcessor {
     AggColumnInfo(index, aggFuncName, hash, aggArgs: _*).toString
   }
 
-  def buildFactTableSortNames(sourceSchema: StructType): Array[String] = {
-    sourceSchema.fieldNames
-      .filter(name => name.startsWith("F" + COLUMN_NAME_SEPARATOR) || 
name.startsWith("R" + COLUMN_NAME_SEPARATOR))
-      .map(name => (factTableSchemaNameToColumnId(name), name))
+  def buildFactTableSortNames(sourceSchema: Seq[Attribute]): Array[String] = {
+    sourceSchema.filter(att => att.name.startsWith("F" + 
COLUMN_NAME_SEPARATOR) || att.name.startsWith("R" + COLUMN_NAME_SEPARATOR))
+      .map(att => (factTableSchemaNameToColumnId(att.name), att.name))
       .sortBy(_._1)
-      .map(_._2)
+      .map(_._2).toArray
   }
 
   def buildSchemaWithRawTable(columnDescs: Array[ColumnDesc]): StructType = {
@@ -165,8 +165,8 @@ object SchemaProcessor {
   }
 
   def genTopNSchema(advanceTableName: String,
-                     colId: Int,
-                     columnName: String = "N"): String = {
+                    colId: Int,
+                    columnName: String = "N"): String = {
     TopNColumnInfo(advanceTableName, colId, columnName).toString
   }
 
@@ -188,14 +188,14 @@ sealed abstract class ColumnInfo(tableName: String,
 case class FactTableCulumnInfo(tableName: String,
                                columnId: Int,
                                columnName: String)
-    extends ColumnInfo(tableName, columnId, columnName) {
+  extends ColumnInfo(tableName, columnId, columnName) {
   override val prefix: String = "F"
 }
 
 case class DeriveTableColumnInfo(tableName: String,
                                  columnId: Int,
                                  columnName: String)
-    extends ColumnInfo(tableName, columnId, columnName) {
+  extends ColumnInfo(tableName, columnId, columnName) {
   override val prefix: String = "D"
 }
 
@@ -204,7 +204,7 @@ case class AggColumnInfo(index: Int,
                          hash: String,
                          args: String*) {
   override def toString: String =
-    s"$funcName(${args.mkString("_")})_${index}_$hash"
+    s"$funcName${args.mkString("_")}_${index}_$hash"
 }
 
 case class TopNColumnInfo(tableName: String, columnId: Int, columnName: String)
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/manager/SparderLookupManager.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/manager/SparderLookupManager.scala
index 2ee6551ada..a9e3634cb6 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/manager/SparderLookupManager.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/manager/SparderLookupManager.scala
@@ -17,43 +17,28 @@
  */
 package org.apache.spark.sql.manager
 
-import java.util.concurrent.TimeUnit
-import org.apache.kylin.guava30.shaded.common.cache.{Cache, CacheBuilder, 
RemovalListener, RemovalNotification}
+import org.apache.hadoop.fs.Path
 import org.apache.kylin.common.{KapConfig, KylinConfig}
-import org.apache.kylin.metadata.model.NTableMetadataManager
-import org.apache.kylin.metadata.model.ColumnDesc
+import org.apache.kylin.metadata.model.{ColumnDesc, NTableMetadataManager}
+import org.apache.kylin.query.util.PartitionsFilter.{PARTITIONS, PARTITION_COL}
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
InMemoryFileIndex, LogicalRelation}
 import org.apache.spark.sql.execution.utils.DeriveTableColumnInfo
+import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.types.{StructField, StructType}
-import org.apache.spark.sql.{DataFrame, Dataset, Row, SparderEnv}
 import org.apache.spark.sql.util.SparderTypeUtil
-import org.apache.kylin.query.util.PartitionsFilter.PARTITION_COL
-import org.apache.kylin.query.util.PartitionsFilter.PARTITIONS
+import org.apache.spark.sql.{SparderEnv, SparkOperation}
 
-import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.collection.mutable.ListBuffer
 
 // scalastyle:off
 object SparderLookupManager extends Logging {
-  val DEFAULT_MAXSIZE = 100
-  val DEFAULT_EXPIRE_TIME = 1
-  val DEFAULT_TIME_UNIT = TimeUnit.HOURS
-
-  val sourceCache: Cache[String, Dataset[Row]] = CacheBuilder.newBuilder
-    .maximumSize(DEFAULT_MAXSIZE)
-    .expireAfterWrite(DEFAULT_EXPIRE_TIME, DEFAULT_TIME_UNIT)
-    .removalListener(new RemovalListener[String, Dataset[Row]]() {
-      override def onRemoval(
-                              notification: RemovalNotification[String, 
Dataset[Row]]): Unit = {
-        logInfo("Remove lookup table from spark : " + notification.getKey)
-        notification.getValue.unpersist()
-      }
-    })
-    .build
-    .asInstanceOf[Cache[String, Dataset[Row]]]
 
   def create(name: String,
              sourcePath: String,
-             kylinConfig: KylinConfig): Dataset[Row] = {
+             kylinConfig: KylinConfig): LogicalPlan = {
     val names = name.split("@")
     val projectName = names.apply(0)
     val tableName = names.apply(1)
@@ -67,7 +52,6 @@ object SparderLookupManager extends Logging {
     }
     val dfTableName = Integer.toHexString(System.identityHashCode(name))
 
-
     val orderedCol = new ListBuffer[(ColumnDesc, Int)]
     var partitionCol: (ColumnDesc, Int) = null
     for ((col, index) <- tableDesc.getColumns.zipWithIndex) {
@@ -84,24 +68,31 @@ object SparderLookupManager extends Logging {
       options.put(PARTITIONS, String.join(",", 
tableDesc.getSnapshotPartitions.keySet()))
       options.put("mapreduce.input.pathFilter.class", 
"org.apache.kylin.query.util.PartitionsFilter")
     }
+
     val originSchema = StructType(orderedCol.map { case (col, index) => 
StructField(col.getName, SparderTypeUtil.toSparkType(col.getType)) })
-    val schema = StructType(orderedCol.map { case (col, index) => 
StructField(DeriveTableColumnInfo(dfTableName, index, col.getName).toString, 
SparderTypeUtil.toSparkType(col.getType)) })
-    val resourcePath = 
KapConfig.getInstanceFromEnv.getReadHdfsWorkingDirectory + sourcePath
+    val aliasCols = orderedCol.map {
+      case (c, index) =>
+        col(c.getName).as(DeriveTableColumnInfo(dfTableName, index, 
c.getName).toString)
+    }
+    val resourcePath = new 
Path(KapConfig.getInstanceFromEnv.getReadHdfsWorkingDirectory + sourcePath)
+
+    val sparkSession = SparderEnv.getSparkSession
 
-    SparderEnv.getSparkSession.read.options(options)
-      .schema(originSchema)
-      .parquet(resourcePath)
-      .toDF(schema.fieldNames: _*)
+    val fileIndex = new InMemoryFileIndex(sparkSession, Seq(resourcePath), 
options.toMap, Option(originSchema))
+    val fsRelation = HadoopFsRelation(
+      fileIndex,
+      partitionSchema = fileIndex.partitionSchema,
+      dataSchema = originSchema,
+      bucketSpec = None,
+      new ParquetFileFormat,
+      options.toMap)(sparkSession)
+    val plan = LogicalRelation(fsRelation)
+    SparkOperation.project(aliasCols, plan)
   }
 
   def getOrCreate(name: String,
                   sourcePath: String,
-                  kylinConfig: KylinConfig): DataFrame = {
-    val value = sourceCache.getIfPresent(sourcePath)
-    if (value != null) {
-      value
-    } else {
-      create(name, sourcePath, kylinConfig)
-    }
+                  kylinConfig: KylinConfig): LogicalPlan = {
+    create(name, sourcePath, kylinConfig)
   }
 }
diff --git 
a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/sql/KylinDataFrameManagerTest.java
 
b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/sql/KylinDataFrameManagerTest.java
index 52bfa2de28..03a56b08df 100644
--- 
a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/sql/KylinDataFrameManagerTest.java
+++ 
b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/sql/KylinDataFrameManagerTest.java
@@ -18,6 +18,7 @@
 package org.apache.kylin.query.sql;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.guava30.shaded.common.collect.ImmutableBiMap;
 import org.apache.kylin.junit.annotation.MetadataInfo;
 import org.apache.kylin.metadata.cube.model.LayoutEntity;
 import org.apache.kylin.metadata.cube.model.NDataflowManager;
@@ -26,16 +27,16 @@ import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.model.TimeRange;
 import org.apache.spark.sql.KylinDataFrameManager;
 import org.apache.spark.sql.SparkSession;
-import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 import org.springframework.util.ReflectionUtils;
 
-import org.apache.kylin.guava30.shaded.common.collect.ImmutableBiMap;
-
 import lombok.val;
 import lombok.var;
 
+@Disabled("Not for open source")
 @MetadataInfo(project = "streaming_test")
 class KylinDataFrameManagerTest {
 
@@ -45,7 +46,7 @@ class KylinDataFrameManagerTest {
         val config = KylinConfig.getInstanceFromEnv();
         val dataflowManager = NDataflowManager.getInstance(config, 
"streaming_test");
         var dataflow = 
dataflowManager.getDataflow("4965c827-fbb4-4ea1-a744-3f341a3b030d");
-        Assert.assertTrue(dataflow.isStreaming() && 
dataflow.getModel().isFusionModel());
+        Assertions.assertTrue(dataflow.isStreaming() && 
dataflow.getModel().isFusionModel());
 
         val kylinDataFrameManager = Mockito.spy(new KylinDataFrameManager(ss));
         kylinDataFrameManager.option("isFastBitmapEnabled", "false");
@@ -56,14 +57,15 @@ class KylinDataFrameManagerTest {
             ImmutableBiMap.Builder<Integer, TblColRef> dimsBuilder = 
ImmutableBiMap.builder();
             ImmutableBiMap<Integer, TblColRef> orderedDimensions = 
dimsBuilder.put(1, partitionTblCol).build();
             
Mockito.when(layoutEntity.getOrderedDimensions()).thenReturn(orderedDimensions);
-            val df = kylinDataFrameManager.cuboidTable(dataflow, layoutEntity, 
"3e560d22-b749-48c3-9f64-d4230207f120");
-            Assert.assertEquals(1, df.columns().length);
+            val plan = kylinDataFrameManager.cuboidTable(dataflow, 
layoutEntity,
+                    "3e560d22-b749-48c3-9f64-d4230207f120");
+            Assertions.assertEquals(1, plan.output().size());
         }
         {
             // condition: id == null
-            val df = kylinDataFrameManager.cuboidTable(dataflow, new 
LayoutEntity(),
+            val plan = kylinDataFrameManager.cuboidTable(dataflow, new 
LayoutEntity(),
                     "3e560d22-b749-48c3-9f64-d4230207f120");
-            Assert.assertEquals(0, df.columns().length);
+            Assertions.assertEquals(0, plan.output().size());
         }
 
         {
@@ -87,12 +89,13 @@ class KylinDataFrameManagerTest {
                         ReflectionUtils.setField(field, timeRange, 
Long.MIN_VALUE);
                         seg.setTimeRange(timeRange);
                     } catch (Exception e) {
-                        Assert.fail(e.getMessage());
+                        Assertions.fail(e.getMessage());
                     }
                 });
             });
-            val df = kylinDataFrameManager.cuboidTable(dataflow, layoutEntity, 
"3e560d22-b749-48c3-9f64-d4230207f120");
-            Assert.assertEquals(1, df.columns().length);
+            val plan = kylinDataFrameManager.cuboidTable(dataflow, 
layoutEntity,
+                    "3e560d22-b749-48c3-9f64-d4230207f120");
+            Assertions.assertEquals(1, plan.output().size());
         }
         ss.stop();
     }
@@ -103,13 +106,14 @@ class KylinDataFrameManagerTest {
         val config = KylinConfig.getInstanceFromEnv();
         val dataflowManager = NDataflowManager.getInstance(config, 
"streaming_test");
         val dataflow = 
dataflowManager.getDataflow("cd2b9a23-699c-4699-b0dd-38c9412b3dfd");
-        Assert.assertFalse(dataflow.isStreaming());
+        Assertions.assertFalse(dataflow.isStreaming());
         val kylinDataFrameManager = Mockito.spy(new KylinDataFrameManager(ss));
         kylinDataFrameManager.option("isFastBitmapEnabled", "false");
         val layoutEntity = new LayoutEntity();
         {
-            val df = kylinDataFrameManager.cuboidTable(dataflow, layoutEntity, 
"86b5daaa-e295-4e8c-b877-f97bda69bee5");
-            Assert.assertEquals(0, df.columns().length);
+            val plan = kylinDataFrameManager.cuboidTable(dataflow, 
layoutEntity,
+                    "86b5daaa-e295-4e8c-b877-f97bda69bee5");
+            Assertions.assertEquals(0, plan.output().size());
         }
         ss.stop();
     }
diff --git 
a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/runtime/plan/SegmentEmptyTest.scala
 
b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/runtime/plan/SegmentEmptyTest.scala
index 03e0577013..b1ce087474 100644
--- 
a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/runtime/plan/SegmentEmptyTest.scala
+++ 
b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/runtime/plan/SegmentEmptyTest.scala
@@ -18,33 +18,33 @@
 
 package org.apache.kylin.query.runtime.plan
 
+import java.util
+
 import org.apache.kylin.metadata.cube.model.NDataSegment
 import org.apache.spark.sql.common.{LocalMetadata, SharedSparkSession, 
SparderBaseFunSuite}
 import org.junit.Assert
 
-import java.util
-
 class SegmentEmptyTest extends SparderBaseFunSuite with SharedSparkSession 
with LocalMetadata {
 
-    val prunedSegment1 = null
-    val prunedSegment2 = new util.LinkedList[NDataSegment]
-    val prunedSegment3 = new util.LinkedList[NDataSegment]
-    prunedSegment3.add(new NDataSegment())
+  val prunedSegment1 = null
+  val prunedSegment2 = new util.LinkedList[NDataSegment]
+  val prunedSegment3 = new util.LinkedList[NDataSegment]
+  prunedSegment3.add(new NDataSegment())
 
-    val prunedStreamingSegment1 = null
-    val prunedStreamingSegment2 = new util.LinkedList[NDataSegment]
-    val prunedStreamingSegment3 = new util.LinkedList[NDataSegment]
-    prunedStreamingSegment3.add(new NDataSegment())
+  val prunedStreamingSegment1 = null
+  val prunedStreamingSegment2 = new util.LinkedList[NDataSegment]
+  val prunedStreamingSegment3 = new util.LinkedList[NDataSegment]
+  prunedStreamingSegment3.add(new NDataSegment())
 
-    Assert.assertTrue(TableScanPlan.isSegmentsEmpty(prunedSegment1, 
prunedStreamingSegment1))
-    Assert.assertTrue(TableScanPlan.isSegmentsEmpty(prunedSegment1, 
prunedStreamingSegment2))
-    Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment1, 
prunedStreamingSegment3))
+  Assert.assertTrue(TableScanPlan.isSegmentsEmpty(prunedSegment1, 
prunedStreamingSegment1))
+  Assert.assertTrue(TableScanPlan.isSegmentsEmpty(prunedSegment1, 
prunedStreamingSegment2))
+  Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment1, 
prunedStreamingSegment3))
 
-    Assert.assertTrue(TableScanPlan.isSegmentsEmpty(prunedSegment2, 
prunedStreamingSegment1))
-    Assert.assertTrue(TableScanPlan.isSegmentsEmpty(prunedSegment2, 
prunedStreamingSegment2))
-    Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment2, 
prunedStreamingSegment3))
+  Assert.assertTrue(TableScanPlan.isSegmentsEmpty(prunedSegment2, 
prunedStreamingSegment1))
+  Assert.assertTrue(TableScanPlan.isSegmentsEmpty(prunedSegment2, 
prunedStreamingSegment2))
+  Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment2, 
prunedStreamingSegment3))
 
-    Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment3, 
prunedStreamingSegment1))
-    Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment3, 
prunedStreamingSegment2))
-    Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment3, 
prunedStreamingSegment3))
+  Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment3, 
prunedStreamingSegment1))
+  Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment3, 
prunedStreamingSegment2))
+  Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment3, 
prunedStreamingSegment3))
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/sql/SparkInternalAgent.scala
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparkInternalAgent.scala
similarity index 100%
rename from 
src/spark-project/engine-spark/src/main/scala/org/apache/spark/sql/SparkInternalAgent.scala
rename to 
src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparkInternalAgent.scala
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/datasource/storage/StorageStore.scala
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/datasource/storage/StorageStore.scala
index 4d05368d6e..bd416345f2 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/datasource/storage/StorageStore.scala
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/datasource/storage/StorageStore.scala
@@ -18,13 +18,16 @@
 
 package org.apache.spark.sql.datasource.storage
 
+import java.util.concurrent.Executors
+import java.util.{Objects, List => JList}
+import java.{lang, util}
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.kylin.common.KapConfig
 import org.apache.kylin.common.util.HadoopUtil
 import org.apache.kylin.engine.spark.job.NSparkCubingUtil
-import 
org.apache.kylin.engine.spark.utils.StorageUtils.findCountDistinctMeasure
-import org.apache.kylin.engine.spark.utils.{Metrics, Repartitioner, 
StorageUtils}
+import org.apache.kylin.engine.spark.utils.{Metrics, StorageUtils}
 import org.apache.kylin.metadata.cube.model.{LayoutEntity, NDataSegment, 
NDataflow}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.LayoutEntityConverter._
@@ -38,21 +41,17 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.{Column, DataFrame, SparkSession}
 import org.apache.spark.util.ThreadUtils
 
-import java.util.concurrent.Executors
-import java.util.{Objects, List => JList}
-import java.{lang, util}
 import scala.collection.JavaConverters._
 import scala.concurrent.duration.Duration
 import scala.concurrent.{ExecutionContext, Future}
 
-case class WriteTaskStats(
-                           numPartitions: Int,
-                           numFiles: Long,
-                           numBytes: Long,
-                           numRows: Long,
-                           sourceRows: Long,
-                           numBucket: Int,
-                           partitionValues: JList[String])
+case class WriteTaskStats(numPartitions: Int,
+                          numFiles: Long,
+                          numBytes: Long,
+                          numRows: Long,
+                          sourceRows: Long,
+                          numBucket: Int,
+                          partitionValues: JList[String])
 
 abstract class StorageStore extends Logging {
 
@@ -62,22 +61,25 @@ abstract class StorageStore extends Logging {
 
   def setStorageListener(listener: StorageListener): Unit = storageListener = 
Some(listener)
 
-  def save(
-            layout: LayoutEntity,
-            outputPath: Path,
-            kapConfig: KapConfig,
-            dataFrame: DataFrame): WriteTaskStats
+  def save(layout: LayoutEntity,
+           outputPath: Path,
+           kapConfig: KapConfig,
+           dataFrame: DataFrame): WriteTaskStats
 
-  def read(
-            dataflow: NDataflow, layout: LayoutEntity, sparkSession: 
SparkSession,
-            extraOptions: Map[String, String] = Map.empty[String, String]): 
DataFrame
+  def read(dataflow: NDataflow,
+           layout: LayoutEntity,
+           sparkSession: SparkSession,
+           extraOptions: Map[String, String] = Map.empty[String, String]): 
LogicalPlan
 
-  def readSpecialSegment(
-                          segment: NDataSegment, layout: LayoutEntity, 
sparkSession: SparkSession,
-                          extraOptions: Map[String, String] = 
Map.empty[String, String]): DataFrame
+  def readSpecialSegment(segment: NDataSegment,
+                         layout: LayoutEntity,
+                         sparkSession: SparkSession,
+                         extraOptions: Map[String, String] = Map.empty[String, 
String]): DataFrame
 
-  def readSpecialSegment(
-                          segment: NDataSegment, layout: LayoutEntity, 
partitionId: java.lang.Long, sparkSession: SparkSession): DataFrame
+  def readSpecialSegment(segment: NDataSegment,
+                         layout: LayoutEntity,
+                         partitionId: java.lang.Long,
+                         sparkSession: SparkSession): DataFrame
 
   def collectFileCountAndSizeAfterSave(outputPath: Path, conf: Configuration): 
(Long, Long) = {
     val fs = outputPath.getFileSystem(conf)
@@ -125,23 +127,22 @@ class StorageStoreV1 extends StorageStore {
     LayoutFormatWriter.write(afterReplaced, layoutEntity, outputPath, 
kapConfig, storageListener)
   }
 
-
   override def read(dataflow: NDataflow, layout: LayoutEntity, sparkSession: 
SparkSession,
-                    extraOptions: Map[String, String] = Map.empty[String, 
String]): DataFrame = {
+                    extraOptions: Map[String, String] = Map.empty[String, 
String]): LogicalPlan = {
     val structType = if 
("true".equals(extraOptions.apply("isFastBitmapEnabled"))) {
       layout.toExactlySchema()
     } else {
       layout.toSchema()
     }
     val indexCatalog = new FilePruner(sparkSession, options = extraOptions, 
structType)
-    sparkSession.baseRelationToDataFrame(
-      HadoopFsRelation(
-        indexCatalog,
-        partitionSchema = indexCatalog.partitionSchema,
-        dataSchema = indexCatalog.dataSchema.asNullable,
-        bucketSpec = None,
-        new ParquetFileFormat,
-        options = extraOptions)(sparkSession))
+    val fsRelation = HadoopFsRelation(
+      indexCatalog,
+      partitionSchema = indexCatalog.partitionSchema,
+      dataSchema = indexCatalog.dataSchema.asNullable,
+      bucketSpec = None,
+      new ParquetFileFormat,
+      options = extraOptions)(sparkSession)
+    LogicalRelation(fsRelation)
   }
 
   override def readSpecialSegment(
diff --git 
a/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/JobSupport.scala
 
b/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/JobSupport.scala
index 5b0d01f6fb..66192a14ca 100644
--- 
a/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/JobSupport.scala
+++ 
b/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/JobSupport.scala
@@ -47,6 +47,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, 
Suite}
 
 import scala.collection.JavaConverters._
 
+
 trait JobSupport
   extends BeforeAndAfterAll
     with BeforeAndAfterEach
@@ -147,9 +148,9 @@ trait JobSupport
 
   @throws[Exception]
   protected def buildSegment(cubeName: String,
-                           segmentRange: SegmentRange[_ <: Comparable[_]],
-                           toBuildLayouts: java.util.Set[LayoutEntity],
-                           prj: String): NDataSegment = {
+                             segmentRange: SegmentRange[_ <: Comparable[_]],
+                             toBuildLayouts: java.util.Set[LayoutEntity],
+                             prj: String): NDataSegment = {
     val config: KylinConfig = KylinConfig.getInstanceFromEnv
     val dsMgr: NDataflowManager = NDataflowManager.getInstance(config, prj)
     val execMgr: NExecutableManager =


Reply via email to