This is an automated email from the ASF dual-hosted git repository. liyang pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 9db3502f80265753b38b4ecbb0ee57f7871b6466 Author: Mingming Ge <7mmi...@gmail.com> AuthorDate: Mon Aug 7 19:02:02 2023 +0800 KYLIN-5774 Optimize Calcite plan to convert spark logical plan --- pom.xml | 4 + .../org/apache/kylin/common/KylinConfigBase.java | 4 - .../src/main/resources/kylin-defaults0.properties | 1 - .../kylin/newten/EnhancedAggPushDownTest.java | 2 +- .../kylin/newten/SpecialColumnNamesTest.java | 104 ++++++++++++++ .../apache/kylin/query/engine/QueryExecTest.java | 59 +------- .../_global/project/special_column_names.json | 35 +++++ .../c41390c5-b35d-4db3-b167-029874b85a2c.json | 13 ++ .../c41390c5-b35d-4db3-b167-029874b85a2c.json | 63 ++++++++ .../c41390c5-b35d-4db3-b167-029874b85a2c.json | 158 +++++++++++++++++++++ .../special_column_names/table/SSB.CUSTOMER.json | 68 +++++++++ .../table/SSB.P_LINEORDER.json | 118 +++++++++++++++ .../kylin/query/runtime/CalciteToSparkPlaner.scala | 121 +++++----------- .../kylin/query/runtime/SparderRexVisitor.scala | 37 ++--- .../apache/kylin/query/runtime/SparkEngine.java | 8 +- .../kylin/query/runtime/plan/AggregatePlan.scala | 87 +++++++----- .../kylin/query/runtime/plan/FilterPlan.scala | 25 ++-- .../apache/kylin/query/runtime/plan/JoinPlan.scala | 58 ++++---- .../kylin/query/runtime/plan/LimitPlan.scala | 25 ++-- .../kylin/query/runtime/plan/MinusPlan.scala | 14 +- .../kylin/query/runtime/plan/ProjectPlan.scala | 19 +-- .../kylin/query/runtime/plan/ResultPlan.scala | 3 +- .../apache/kylin/query/runtime/plan/SortPlan.scala | 52 ++++--- .../kylin/query/runtime/plan/TableScanPlan.scala | 118 +++++++-------- .../kylin/query/runtime/plan/UnionPlan.scala | 30 ++-- .../kylin/query/runtime/plan/ValuesPlan.scala | 15 +- .../kylin/query/runtime/plan/WindowPlan.scala | 29 ++-- .../apache/kylin/query/util/RuntimeHelper.scala | 31 ++-- .../kylin/query/util/SparderDerivedUtil.scala | 52 ++++--- .../apache/spark/sql/KylinDataFrameManager.scala | 19 ++- .../org/apache/spark/sql/SparkOperation.scala | 64 ++++++--- .../sql/execution/utils/SchemaProcessor.scala | 36 ++--- .../spark/sql/manager/SparderLookupManager.scala | 69 ++++----- .../kylin/query/sql/KylinDataFrameManagerTest.java | 32 +++-- .../query/runtime/plan/SegmentEmptyTest.scala | 38 ++--- .../org/apache/spark/sql/SparkInternalAgent.scala | 0 .../sql/datasource/storage/StorageStore.scala | 73 +++++----- .../scala/org/apache/kylin/common/JobSupport.scala | 7 +- 38 files changed, 1104 insertions(+), 587 deletions(-) diff --git a/pom.xml b/pom.xml index 71763ea249..dac240b005 100644 --- a/pom.xml +++ b/pom.xml @@ -1509,6 +1509,10 @@ <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-mapper-asl</artifactId> </exclusion> + <exclusion> + <groupId>org.apache.calcite</groupId> + <artifactId>calcite</artifactId> + </exclusion> </exclusions> <scope>provided</scope> </dependency> diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 5bfe8c262a..c1dd5379e3 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1906,10 +1906,6 @@ public abstract class KylinConfigBase implements Serializable { return Boolean.parseBoolean(this.getOptional("kylin.query.schema-cache-enabled", FALSE)); } - public boolean isDataFrameCacheEnabled() { - return Boolean.parseBoolean(this.getOptional("kylin.query.dataframe-cache-enabled", TRUE)); - } - public boolean enableReplaceDynamicParams() { return Boolean.parseBoolean(this.getOptional("kylin.query.replace-dynamic-params-enabled", FALSE)); } diff --git a/src/core-common/src/main/resources/kylin-defaults0.properties b/src/core-common/src/main/resources/kylin-defaults0.properties index b10ef59365..56cd346578 100644 --- a/src/core-common/src/main/resources/kylin-defaults0.properties +++ b/src/core-common/src/main/resources/kylin-defaults0.properties @@ -232,7 +232,6 @@ kylin.query.join-match-optimization-enabled=false kylin.query.convert-count-distinct-expression-enabled=false kylin.query.memory-limit-during-collect-mb=5400 kylin.query.cartesian-partition-num-threshold-factor=100 -kylin.query.dataframe-cache-enabled=true # spark context canary to monitor spark kylin.canary.sqlcontext-enabled=false diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/EnhancedAggPushDownTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/EnhancedAggPushDownTest.java index a10ab79acb..b5f3646ebd 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/EnhancedAggPushDownTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/EnhancedAggPushDownTest.java @@ -131,7 +131,7 @@ public class EnhancedAggPushDownTest extends NLocalWithSparkSessionTest { queryExec.executeQuery(sql); } catch (Exception e) { Assert.assertTrue(e instanceof SQLException); - Assert.assertTrue(queryExec.getSparderQueryOptimizedExceptionMsg().contains("Path does not exist")); + Assert.assertTrue(queryExec.getSparderQueryOptimizedExceptionMsg().contains("does not exist")); } return ContextUtil.listContexts(); } diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/SpecialColumnNamesTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/SpecialColumnNamesTest.java new file mode 100644 index 0000000000..f78bc35983 --- /dev/null +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/SpecialColumnNamesTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.newten; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.apache.hadoop.util.Shell; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.impl.threadpool.NDefaultScheduler; +import org.apache.kylin.query.engine.QueryExec; +import org.apache.kylin.util.ExecAndComp; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.SparderEnv; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.internal.StaticSQLConf; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import lombok.val; + +public class SpecialColumnNamesTest extends NLocalWithSparkSessionTest { + + @BeforeClass + public static void initSpark() { + if (Shell.MAC) + overwriteSystemPropBeforeClass("org.xerial.snappy.lib.name", "libsnappyjava.jnilib");//for snappy + if (ss != null && !ss.sparkContext().isStopped()) { + ss.stop(); + } + sparkConf = new SparkConf().setAppName(UUID.randomUUID().toString()).setMaster("local[4]"); + sparkConf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer"); + sparkConf.set(StaticSQLConf.CATALOG_IMPLEMENTATION().key(), "in-memory"); + sparkConf.set("spark.sql.shuffle.partitions", "1"); + sparkConf.set("spark.memory.fraction", "0.1"); + // opt memory + sparkConf.set("spark.shuffle.detectCorrupt", "false"); + // For sinai_poc/query03, enable implicit cross join conversion + sparkConf.set("spark.sql.crossJoin.enabled", "true"); + sparkConf.set("spark.sql.adaptive.enabled", "false"); + sparkConf.set("spark.sql.autoBroadcastJoinThreshold", "1"); + ss = SparkSession.builder().config(sparkConf).getOrCreate(); + SparderEnv.setSparkSession(ss); + } + + @Before + public void setup() throws IOException { + overwriteSystemProp("kylin.job.scheduler.poll-interval-second", "1"); + this.createTestMetadata("src/test/resources/ut_meta/special_column_names"); + NDefaultScheduler scheduler = NDefaultScheduler.getInstance(getProject()); + scheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv())); + if (!scheduler.hasStarted()) { + throw new RuntimeException("scheduler has not been started"); + } + } + + @After + public void after() throws Exception { + NDefaultScheduler.destroyInstance(); + cleanupTestMetadata(); + } + + public String getProject() { + return "special_column_names"; + } + + @Test + public void testSpecialColumnNames() throws Exception { + fullBuild("c41390c5-b35d-4db3-b167-029874b85a2c"); + + populateSSWithCSVData(getTestConfig(), getProject(), ss); + List<Pair<String, String>> query = new ArrayList<>(); + String sql = "select count(1), \"//LO_LINENUMBER\", \"LO_CU//STKEY\" from SSB.P_LINEORDER group by \"//LO_LINENUMBER\", \"LO_CU//STKEY\""; + query.add(Pair.newPair("special_column_names", sql)); + ExecAndComp.execAndCompare(query, getProject(), ExecAndComp.CompareLevel.SAME, "default"); + + QueryExec queryExec = new QueryExec(getProject(), KylinConfig.getInstanceFromEnv()); + val resultSet = queryExec.executeQuery(sql); + Assert.assertEquals(140, resultSet.getRows().size()); + } +} diff --git a/src/kylin-it/src/test/java/org/apache/kylin/query/engine/QueryExecTest.java b/src/kylin-it/src/test/java/org/apache/kylin/query/engine/QueryExecTest.java index a13cd7fb6a..ba34dc387c 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/query/engine/QueryExecTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/query/engine/QueryExecTest.java @@ -20,16 +20,10 @@ package org.apache.kylin.query.engine; import java.sql.SQLException; -import org.apache.calcite.rel.RelNode; import org.apache.calcite.sql.parser.SqlParseException; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.common.util.Unsafe; -import org.apache.kylin.query.engine.meta.SimpleDataContext; -import org.apache.kylin.query.relnode.KapRel; -import org.apache.kylin.query.runtime.CalciteToSparkPlaner; -import org.apache.kylin.query.util.QueryContextCutter; import org.apache.kylin.query.util.QueryHelper; import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.Dataset; @@ -40,10 +34,8 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.springframework.security.util.FieldUtils; import lombok.val; -import scala.Function0; public class QueryExecTest extends NLocalFileMetadataTestCase { @@ -70,7 +62,8 @@ public class QueryExecTest extends NLocalFileMetadataTestCase { } /** - * <p>See also {@link org.apache.kylin.query.engine.QueryExecTest#testSumCaseWhenHasNull()} + * <p>See also {@link org.apache.kylin.query.engine.QueryExecTest#testSumCaseWhenHasNull()} + * * @throws SQLException */ @Test @@ -90,6 +83,7 @@ public class QueryExecTest extends NLocalFileMetadataTestCase { * <code>case COUNT(null) when 0 then null else SUM0(null) end</code>, which is incompatible with model section * * <p>See also {@link org.apache.kylin.query.engine.QueryExecTest#testWorkWithoutKapAggregateReduceFunctionsRule()} + * * @throws SqlParseException */ @Test @@ -140,51 +134,4 @@ public class QueryExecTest extends NLocalFileMetadataTestCase { } } } - - @Test - public void testSparkPlanWithoutCache() throws IllegalAccessException, SqlParseException { - overwriteSystemProp("kylin.query.dataframe-cache-enabled", "false"); - String sql = "select count(*) from TEST_KYLIN_FACT group by seller_id"; - QueryExec qe = new QueryExec(project, KylinConfig.getInstanceFromEnv()); - RelNode root = qe.parseAndOptimize(sql); - SimpleDataContext dataContext = (SimpleDataContext) FieldUtils.getFieldValue(qe, "dataContext"); - QueryContextCutter.selectRealization(root, BackdoorToggles.getIsQueryFromAutoModeling()); - CalciteToSparkPlaner calciteToSparkPlaner = new CalciteToSparkPlaner(dataContext) { - @Override - public Dataset<Row> actionWithCache(KapRel rel, Function0<Dataset<Row>> body) { - throw new RuntimeException(); - } - }; - try { - calciteToSparkPlaner.go(root.getInput(0)); - } finally { - calciteToSparkPlaner.cleanCache(); - } - } - - @Test - public void testSparkPlanWithCache() throws IllegalAccessException, SqlParseException { - overwriteSystemProp("kylin.query.dataframe-cache-enabled", "true"); - String sql = "select count(*) from TEST_KYLIN_FACT group by seller_id"; - QueryExec qe = new QueryExec(project, KylinConfig.getInstanceFromEnv()); - RelNode root = qe.parseAndOptimize(sql); - SimpleDataContext dataContext = (SimpleDataContext) FieldUtils.getFieldValue(qe, "dataContext"); - QueryContextCutter.selectRealization(root, BackdoorToggles.getIsQueryFromAutoModeling()); - CalciteToSparkPlaner calciteToSparkPlaner = new CalciteToSparkPlaner(dataContext) { - @Override - public Dataset<Row> actionWithCache(KapRel rel, Function0<Dataset<Row>> body) { - throw new IllegalStateException(); - } - }; - Exception expectException = null; - try { - calciteToSparkPlaner.go(root.getInput(0)); - } catch (Exception e) { - expectException = e; - } finally { - calciteToSparkPlaner.cleanCache(); - Assert.assertTrue(expectException instanceof IllegalStateException); - } - } - } diff --git a/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/_global/project/special_column_names.json b/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/_global/project/special_column_names.json new file mode 100644 index 0000000000..c7500a3bc8 --- /dev/null +++ b/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/_global/project/special_column_names.json @@ -0,0 +1,35 @@ +{ + "uuid" : "24854c55-8298-623e-e64a-42ae6faf1f1a", + "last_modified" : 1677855913210, + "create_time" : 1677855913185, + "version" : "4.0.0.0", + "name" : "special_column_names", + "owner" : "ADMIN", + "status" : "ENABLED", + "create_time_utc" : 1677855913185, + "default_database" : "DEFAULT", + "description" : "", + "principal" : null, + "keytab" : null, + "maintain_model_type" : "MANUAL_MAINTAIN", + "override_kylin_properties" : { + "kylin.metadata.semi-automatic-mode" : "false", + "kylin.query.metadata.expose-computed-column" : "true", + "kylin.source.default" : "9" + }, + "segment_config" : { + "auto_merge_enabled" : false, + "auto_merge_time_ranges" : [ "WEEK", "MONTH", "QUARTER", "YEAR" ], + "volatile_range" : { + "volatile_range_number" : 0, + "volatile_range_enabled" : false, + "volatile_range_type" : "DAY" + }, + "retention_range" : { + "retention_range_number" : 1, + "retention_range_enabled" : false, + "retention_range_type" : "MONTH" + }, + "create_empty_segment_enabled" : false + } +} \ No newline at end of file diff --git a/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/dataflow/c41390c5-b35d-4db3-b167-029874b85a2c.json b/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/dataflow/c41390c5-b35d-4db3-b167-029874b85a2c.json new file mode 100644 index 0000000000..c2e8cc33ed --- /dev/null +++ b/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/dataflow/c41390c5-b35d-4db3-b167-029874b85a2c.json @@ -0,0 +1,13 @@ +{ + "uuid" : "c41390c5-b35d-4db3-b167-029874b85a2c", + "last_modified" : 0, + "create_time" : 1677856069390, + "version" : "4.0.0.0", + "status" : "OFFLINE", + "last_status" : null, + "cost" : 50, + "query_hit_count" : 0, + "last_query_time" : 0, + "layout_query_hit_count" : { }, + "segments" : [ ] +} \ No newline at end of file diff --git a/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/index_plan/c41390c5-b35d-4db3-b167-029874b85a2c.json b/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/index_plan/c41390c5-b35d-4db3-b167-029874b85a2c.json new file mode 100644 index 0000000000..354bdadf2e --- /dev/null +++ b/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/index_plan/c41390c5-b35d-4db3-b167-029874b85a2c.json @@ -0,0 +1,63 @@ +{ + "uuid" : "c41390c5-b35d-4db3-b167-029874b85a2c", + "last_modified" : 1677856069297, + "create_time" : 1677856069297, + "version" : "4.0.0.0", + "description" : null, + "rule_based_index" : null, + "indexes" : [ { + "id" : 0, + "dimensions" : [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17 ], + "measures" : [ 100000 ], + "layouts" : [ { + "id" : 1, + "name" : null, + "owner" : null, + "col_order" : [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 100000 ], + "shard_by_columns" : [ ], + "partition_by_columns" : [ ], + "sort_by_columns" : [ ], + "storage_type" : 20, + "update_time" : 1677856069309, + "manual" : false, + "auto" : false, + "base" : true, + "draft_version" : null, + "index_range" : null + } ], + "next_layout_offset" : 2 + }, { + "id" : 20000000000, + "dimensions" : [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17 ], + "measures" : [ ], + "layouts" : [ { + "id" : 20000000001, + "name" : null, + "owner" : null, + "col_order" : [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17 ], + "shard_by_columns" : [ ], + "partition_by_columns" : [ ], + "sort_by_columns" : [ ], + "storage_type" : 20, + "update_time" : 1677856069310, + "manual" : false, + "auto" : false, + "base" : true, + "draft_version" : null, + "index_range" : null + } ], + "next_layout_offset" : 2 + } ], + "override_properties" : { }, + "to_be_deleted_indexes" : [ ], + "auto_merge_time_ranges" : null, + "retention_range" : 0, + "engine_type" : 80, + "next_aggregation_index_id" : 10000, + "next_table_index_id" : 20000010000, + "agg_shard_by_columns" : [ ], + "extend_partition_columns" : [ ], + "layout_bucket_num" : { }, + "approved_additional_recs" : 0, + "approved_removal_recs" : 0 +} \ No newline at end of file diff --git a/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/model_desc/c41390c5-b35d-4db3-b167-029874b85a2c.json b/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/model_desc/c41390c5-b35d-4db3-b167-029874b85a2c.json new file mode 100644 index 0000000000..13c33c24e2 --- /dev/null +++ b/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/model_desc/c41390c5-b35d-4db3-b167-029874b85a2c.json @@ -0,0 +1,158 @@ +{ + "uuid" : "c41390c5-b35d-4db3-b167-029874b85a2c", + "last_modified" : 1677856069295, + "create_time" : 1677856068306, + "version" : "4.0.0.0", + "alias" : "p_lineorder", + "owner" : "ADMIN", + "config_last_modifier" : null, + "config_last_modified" : 0, + "description" : "", + "fact_table" : "SSB.P_LINEORDER", + "fact_table_alias" : null, + "management_type" : "MODEL_BASED", + "join_tables" : [ ], + "filter_condition" : "", + "partition_desc" : { + "partition_date_column" : "P_LINEORDER.LO_ORDERDATE", + "partition_date_start" : 0, + "partition_date_format" : "yyyy-MM-dd", + "partition_type" : "APPEND", + "partition_condition_builder" : "org.apache.kylin.metadata.model.PartitionDesc$DefaultPartitionConditionBuilder" + }, + "capacity" : "MEDIUM", + "segment_config" : { + "auto_merge_enabled" : null, + "auto_merge_time_ranges" : null, + "volatile_range" : null, + "retention_range" : null, + "create_empty_segment_enabled" : false + }, + "data_check_desc" : null, + "semantic_version" : 0, + "storage_type" : 0, + "model_type" : "BATCH", + "all_named_columns" : [ { + "id" : 0, + "name" : "LO_SHIPMODE", + "column" : "P_LINEORDER.LO_SHIPMODE", + "status" : "DIMENSION" + }, { + "id" : 1, + "name" : "//LO_LINENUMBER", + "column" : "P_LINEORDER.//LO_LINENUMBER", + "status" : "DIMENSION" + }, { + "id" : 2, + "name" : "LO_ORDTOTALPRICE", + "column" : "P_LINEORDER.LO_ORDTOTALPRICE", + "status" : "DIMENSION" + }, { + "id" : 3, + "name" : "LO_SUPPLYCOST", + "column" : "P_LINEORDER.LO_SUPPLYCOST", + "status" : "DIMENSION" + }, { + "id" : 4, + "name" : "LO_SUPPKEY", + "column" : "P_LINEORDER.LO_SUPPKEY", + "status" : "DIMENSION" + }, { + "id" : 5, + "name" : "LO_QUANTITY", + "column" : "P_LINEORDER.LO_QUANTITY", + "status" : "DIMENSION" + }, { + "id" : 6, + "name" : "LO_PARTKEY", + "column" : "P_LINEORDER.LO_PARTKEY", + "status" : "DIMENSION" + }, { + "id" : 7, + "name" : "LO_ORDERKEY", + "column" : "P_LINEORDER.LO_ORDERKEY", + "status" : "DIMENSION" + }, { + "id" : 8, + "name" : "LO_CU//STKEY", + "column" : "P_LINEORDER.LO_CU//STKEY", + "status" : "DIMENSION" + }, { + "id" : 9, + "name" : "LO_SHIPPRIOTITY", + "column" : "P_LINEORDER.LO_SHIPPRIOTITY", + "status" : "DIMENSION" + }, { + "id" : 10, + "name" : "LO_DISCOUNT", + "column" : "P_LINEORDER.LO_DISCOUNT", + "status" : "DIMENSION" + }, { + "id" : 11, + "name" : "LO_ORDERPRIOTITY", + "column" : "P_LINEORDER.LO_ORDERPRIOTITY", + "status" : "DIMENSION" + }, { + "id" : 12, + "name" : "LO_ORDERDATE", + "column" : "P_LINEORDER.LO_ORDERDATE", + "status" : "DIMENSION" + }, { + "id" : 13, + "name" : "LO_REVENUE", + "column" : "P_LINEORDER.LO_REVENUE", + "status" : "DIMENSION" + }, { + "id" : 14, + "name" : "V_REVENUE", + "column" : "P_LINEORDER.V_REVENUE", + "status" : "DIMENSION" + }, { + "id" : 15, + "name" : "LO_COMMITDATE", + "column" : "P_LINEORDER.LO_COMMITDATE", + "status" : "DIMENSION" + }, { + "id" : 16, + "name" : "LO_EXTENDEDPRICE", + "column" : "P_LINEORDER.LO_EXTENDEDPRICE", + "status" : "DIMENSION" + }, { + "id" : 17, + "name" : "LO_TAX", + "column" : "P_LINEORDER.LO_TAX", + "status" : "DIMENSION" + } ], + "all_measures" : [ { + "name" : "COUNT_ALL", + "function" : { + "expression" : "COUNT", + "parameters" : [ { + "type" : "constant", + "value" : "1" + } ], + "returntype" : "bigint" + }, + "column" : null, + "comment" : null, + "id" : 100000, + "type" : "NORMAL", + "internal_ids" : [ ] + } ], + "recommendations_count" : 0, + "computed_columns" : [ ], + "canvas" : { + "coordinate" : { + "P_LINEORDER" : { + "x" : 530.6666395399305, + "y" : 249.00000678168405, + "width" : 220.0, + "height" : 200.0 + } + }, + "zoom" : 9.0 + }, + "multi_partition_desc" : null, + "multi_partition_key_mapping" : null, + "fusion_id" : null +} \ No newline at end of file diff --git a/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/table/SSB.CUSTOMER.json b/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/table/SSB.CUSTOMER.json new file mode 100644 index 0000000000..ba734604c1 --- /dev/null +++ b/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/table/SSB.CUSTOMER.json @@ -0,0 +1,68 @@ +{ + "uuid" : "9481374f-63be-5f0b-5a6d-22c7a561b1d9", + "last_modified" : 0, + "create_time" : 1677856024986, + "version" : "4.0.0.0", + "name" : "CUSTOMER", + "columns" : [ { + "id" : "1", + "name" : "//C_CUSTKEY", + "datatype" : "integer", + "case_sensitive_name" : "c_custkey" + }, { + "id" : "2", + "name" : "C_N/@#AME", + "datatype" : "varchar(4096)", + "case_sensitive_name" : "c_name" + }, { + "id" : "3", + "name" : "/C_ADDR//ESS", + "datatype" : "varchar(4096)", + "case_sensitive_name" : "c_address" + }, { + "id" : "4", + "name" : "C_CITY", + "datatype" : "varchar(4096)", + "case_sensitive_name" : "c_city" + }, { + "id" : "5", + "name" : "C_NATION", + "datatype" : "varchar(4096)", + "case_sensitive_name" : "c_nation" + }, { + "id" : "6", + "name" : "C_REGION", + "datatype" : "varchar(4096)", + "case_sensitive_name" : "c_region" + }, { + "id" : "7", + "name" : "C_PHONE", + "datatype" : "varchar(4096)", + "case_sensitive_name" : "c_phone" + }, { + "id" : "8", + "name" : "C_MKTSEGMENT", + "datatype" : "varchar(4096)", + "case_sensitive_name" : "c_mktsegment" + } ], + "source_type" : 9, + "table_type" : "EXTERNAL", + "top" : false, + "increment_loading" : false, + "last_snapshot_path" : null, + "last_snapshot_size" : 0, + "snapshot_last_modified" : 0, + "query_hit_count" : 0, + "partition_column" : null, + "snapshot_partitions" : { }, + "snapshot_partitions_info" : { }, + "snapshot_total_rows" : 0, + "snapshot_partition_col" : null, + "selected_snapshot_partition_col" : null, + "temp_snapshot_path" : null, + "snapshot_has_broken" : false, + "database" : "SSB", + "transactional" : false, + "rangePartition" : false, + "partition_desc" : null +} \ No newline at end of file diff --git a/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/table/SSB.P_LINEORDER.json b/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/table/SSB.P_LINEORDER.json new file mode 100644 index 0000000000..0ff6cd2b7d --- /dev/null +++ b/src/kylin-it/src/test/resources/ut_meta/special_column_names/metadata/special_column_names/table/SSB.P_LINEORDER.json @@ -0,0 +1,118 @@ +{ + "uuid" : "f0dc3d45-0eb0-26c0-4681-b6c0d0c8e270", + "last_modified" : 0, + "create_time" : 1677856025283, + "version" : "4.0.0.0", + "name" : "P_LINEORDER", + "columns" : [ { + "id" : "1", + "name" : "LO_ORDERKEY", + "datatype" : "bigint", + "case_sensitive_name" : "lo_orderkey" + }, { + "id" : "2", + "name" : "//LO_LINENUMBER", + "datatype" : "bigint", + "case_sensitive_name" : "//lo_linenumber" + }, { + "id" : "3", + "name" : "LO_CU//STKEY", + "datatype" : "integer", + "case_sensitive_name" : "lo_cu//stkey" + }, { + "id" : "4", + "name" : "LO_PARTKEY", + "datatype" : "integer", + "case_sensitive_name" : "lo_partkey" + }, { + "id" : "5", + "name" : "LO_SUPPKEY", + "datatype" : "integer", + "case_sensitive_name" : "lo_suppkey" + }, { + "id" : "6", + "name" : "LO_ORDERDATE", + "datatype" : "date", + "case_sensitive_name" : "lo_orderdate" + }, { + "id" : "7", + "name" : "LO_ORDERPRIOTITY", + "datatype" : "varchar(4096)", + "case_sensitive_name" : "lo_orderpriotity" + }, { + "id" : "8", + "name" : "LO_SHIPPRIOTITY", + "datatype" : "integer", + "case_sensitive_name" : "lo_shippriotity" + }, { + "id" : "9", + "name" : "LO_QUANTITY", + "datatype" : "bigint", + "case_sensitive_name" : "lo_quantity" + }, { + "id" : "10", + "name" : "LO_EXTENDEDPRICE", + "datatype" : "bigint", + "case_sensitive_name" : "lo_extendedprice" + }, { + "id" : "11", + "name" : "LO_ORDTOTALPRICE", + "datatype" : "bigint", + "case_sensitive_name" : "lo_ordtotalprice" + }, { + "id" : "12", + "name" : "LO_DISCOUNT", + "datatype" : "bigint", + "case_sensitive_name" : "lo_discount" + }, { + "id" : "13", + "name" : "LO_REVENUE", + "datatype" : "bigint", + "case_sensitive_name" : "lo_revenue" + }, { + "id" : "14", + "name" : "LO_SUPPLYCOST", + "datatype" : "bigint", + "case_sensitive_name" : "lo_supplycost" + }, { + "id" : "15", + "name" : "LO_TAX", + "datatype" : "bigint", + "case_sensitive_name" : "lo_tax" + }, { + "id" : "16", + "name" : "LO_COMMITDATE", + "datatype" : "date", + "case_sensitive_name" : "lo_commitdate" + }, { + "id" : "17", + "name" : "LO_SHIPMODE", + "datatype" : "varchar(4096)", + "case_sensitive_name" : "lo_shipmode" + }, { + "id" : "18", + "name" : "V_REVENUE", + "datatype" : "bigint", + "case_sensitive_name" : "v_revenue" + } ], + "source_type" : 9, + "table_type" : "MANAGED_TABLE", + "top" : false, + "increment_loading" : false, + "last_snapshot_path" : null, + "last_snapshot_size" : 0, + "snapshot_last_modified" : 0, + "query_hit_count" : 0, + "partition_column" : null, + "snapshot_partitions" : { }, + "snapshot_partitions_info" : { }, + "snapshot_total_rows" : 0, + "snapshot_partition_col" : null, + "selected_snapshot_partition_col" : null, + "temp_snapshot_path" : null, + "snapshot_has_broken" : false, + "database" : "SSB", + "transactional" : false, + "rangePartition" : false, + "partition_desc" : null +} \ No newline at end of file diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/CalciteToSparkPlaner.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/CalciteToSparkPlaner.scala index 76e9940270..13dbfcc987 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/CalciteToSparkPlaner.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/CalciteToSparkPlaner.scala @@ -15,30 +15,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kylin.query.runtime import java.util -import java.util.Collections import org.apache.calcite.DataContext import org.apache.calcite.rel.{RelNode, RelVisitor} import org.apache.kylin.common.KylinConfig import org.apache.kylin.engine.spark.utils.LogEx -import org.apache.kylin.guava30.shaded.common.collect.Lists -import org.apache.kylin.query.relnode.{KapAggregateRel, KapFilterRel, KapJoinRel, KapLimitRel, KapMinusRel, KapModelViewRel, KapNonEquiJoinRel, KapProjectRel, KapRel, KapSortRel, KapTableScan, KapUnionRel, KapValuesRel, KapWindowRel} -import org.apache.kylin.query.runtime.plan.{AggregatePlan, FilterPlan, LimitPlan, ProjectPlan, SortPlan, TableScanPlan, ValuesPlan, WindowPlan} -import org.apache.kylin.query.util.KapRelUtil -import org.apache.spark.sql.DataFrame - -import scala.collection.JavaConverters._ - +import org.apache.kylin.query.relnode._ +import org.apache.kylin.query.runtime.plan._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.{DataFrame, SparderEnv, SparkInternalAgent} class CalciteToSparkPlaner(dataContext: DataContext) extends RelVisitor with LogEx { - private val stack = new util.Stack[DataFrame]() + private val stack = new util.Stack[LogicalPlan]() private val setOpStack = new util.Stack[Int]() private var unionLayer = 0 - private val dataframeCache = KylinConfig.getInstanceFromEnv.isDataFrameCacheEnabled // clear cache before any op cleanCache() @@ -61,24 +54,28 @@ class CalciteToSparkPlaner(dataContext: DataContext) extends RelVisitor with Log stack.push(node match { case rel: KapTableScan => convertTableScan(rel) case rel: KapFilterRel => - logTime("filter") { FilterPlan.filter(Lists.newArrayList(stack.pop()), rel, dataContext) } + logTime("filter") { + FilterPlan.filter(stack.pop(), rel, dataContext) + } case rel: KapProjectRel => logTime("project") { - actionWith(rel) { - ProjectPlan.select(Lists.newArrayList(stack.pop()), rel, dataContext) - } + ProjectPlan.select(stack.pop(), rel, dataContext) } case rel: KapLimitRel => - logTime("limit") { LimitPlan.limit(Lists.newArrayList(stack.pop()), rel, dataContext) } + logTime("limit") { + LimitPlan.limit(stack.pop(), rel, dataContext) + } case rel: KapSortRel => - logTime("sort") { SortPlan.sort(Lists.newArrayList(stack.pop()), rel, dataContext) } + logTime("sort") { + SortPlan.sort(stack.pop(), rel, dataContext) + } case rel: KapWindowRel => - logTime("window") { WindowPlan.window(Lists.newArrayList(stack.pop()), rel, dataContext) } + logTime("window") { + WindowPlan.window(stack.pop(), rel, dataContext) + } case rel: KapAggregateRel => logTime("agg") { - actionWith(rel) { - AggregatePlan.agg(Lists.newArrayList(stack.pop()), rel) - } + AggregatePlan.agg(stack.pop(), rel) } case rel: KapJoinRel => convertJoinRel(rel) case rel: KapNonEquiJoinRel => convertNonEquiJoinRel(rel) @@ -88,21 +85,29 @@ class CalciteToSparkPlaner(dataContext: DataContext) extends RelVisitor with Log if (KylinConfig.getInstanceFromEnv.isCollectUnionInOrder) { unionBlocks = unionBlocks.reverse } - logTime("union") { plan.UnionPlan.union(unionBlocks.asJava, rel, dataContext) } + logTime("union") { + plan.UnionPlan.union(unionBlocks, rel, dataContext) + } case rel: KapMinusRel => val size = setOpStack.pop() - logTime("minus") { plan.MinusPlan.minus(Range(0, stack.size() - size).map(a => stack.pop()).reverse, rel, dataContext) } + logTime("minus") { + plan.MinusPlan.minus(Range(0, stack.size() - size).map(a => stack.pop()).reverse, rel, dataContext) + } case rel: KapValuesRel => - logTime("values") { ValuesPlan.values(rel) } + logTime("values") { + ValuesPlan.values(rel) + } case rel: KapModelViewRel => - logTime("modelview") { stack.pop() } + logTime("modelview") { + stack.pop() + } }) if (node.isInstanceOf[KapUnionRel]) { unionLayer = unionLayer - 1 } } - private def convertTableScan(rel: KapTableScan): DataFrame = { + private def convertTableScan(rel: KapTableScan): LogicalPlan = { rel.getContext.genExecFunc(rel, rel.getTableName) match { case "executeLookupTableQuery" => logTime("createLookupTable") { @@ -123,7 +128,7 @@ class CalciteToSparkPlaner(dataContext: DataContext) extends RelVisitor with Log } } - private def convertJoinRel(rel: KapJoinRel): DataFrame = { + private def convertJoinRel(rel: KapJoinRel): LogicalPlan = { if (!rel.isRuntimeJoin) { rel.getContext.genExecFunc(rel, "") match { case "executeMetadataQuery" => @@ -139,12 +144,12 @@ class CalciteToSparkPlaner(dataContext: DataContext) extends RelVisitor with Log val right = stack.pop() val left = stack.pop() logTime("join") { - plan.JoinPlan.join(Lists.newArrayList(left, right), rel) + plan.JoinPlan.join(Seq.apply(left, right), rel) } } } - private def convertNonEquiJoinRel(rel: KapNonEquiJoinRel): DataFrame = { + private def convertNonEquiJoinRel(rel: KapNonEquiJoinRel): LogicalPlan = { if (!rel.isRuntimeJoin) { logTime("join with table scan") { TableScanPlan.createOLAPTable(rel) @@ -153,65 +158,17 @@ class CalciteToSparkPlaner(dataContext: DataContext) extends RelVisitor with Log val right = stack.pop() val left = stack.pop() logTime("non-equi join") { - plan.JoinPlan.nonEquiJoin(Lists.newArrayList(left, right), rel, dataContext) - } - } - } - - def actionWith(rel: KapRel)(body: => DataFrame): DataFrame = { - if (!dataframeCache) { - body - } else { - actionWithCache(rel) { - body - } - } - } - - protected def actionWithCache(rel: KapRel)(body: => DataFrame): DataFrame = { - var layoutId = 0L - var modelId = "" - var pruningSegmentHashCode = 0 - if (rel.getDigest == null) { - body - } else { - try { - val storageContext = rel.getContext.storageContext - val layoutEntity = storageContext.getCandidate.getLayoutEntity - val prunedSegments = storageContext.getPrunedSegments - val tempTimeRange = new StringBuilder() - if (prunedSegments != null) { - prunedSegments.forEach(segment => { - tempTimeRange.append(segment.getName) - }) - } - pruningSegmentHashCode = tempTimeRange.toString().hashCode - layoutId = layoutEntity.getId - modelId = layoutEntity.getModel.getId + "_" + pruningSegmentHashCode - } catch { - case e: Throwable => logWarning(s"Calculate layoutId modelId failed ex:${e.getMessage}") - } - rel.recomputeDigest() - val digestWithoutId = KapRelUtil.getDigestWithoutRelNodeId(rel.getDigest, layoutId, modelId) - if (unionLayer >= 1 && TableScanPlan.cacheDf.get.containsKey(digestWithoutId)) { - stack.pop() - logInfo("Happen Optimized from cache dataframe CacheKey:" + digestWithoutId) - TableScanPlan.cacheDf.get.get(digestWithoutId) - } else { - val df = body - if (unionLayer >= 1) { - TableScanPlan.cacheDf.get.put(digestWithoutId, df) - } - df + plan.JoinPlan.nonEquiJoin(Seq.apply(left, right), rel, dataContext) } } } def cleanCache(): Unit = { - TableScanPlan.cacheDf.get().clear() + TableScanPlan.cachePlan.get().clear() } def getResult(): DataFrame = { - stack.pop() + val logicalPlan = stack.pop() + SparkInternalAgent.getDataFrame(SparderEnv.getSparkSession, logicalPlan) } } diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/SparderRexVisitor.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/SparderRexVisitor.scala index e2a37a0c7c..2adac31699 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/SparderRexVisitor.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/SparderRexVisitor.scala @@ -22,37 +22,38 @@ package org.apache.kylin.query.runtime import java.math.BigDecimal import java.sql.Timestamp import java.time.ZoneId + import org.apache.calcite.DataContext import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex._ import org.apache.calcite.sql.SqlKind._ import org.apache.calcite.sql.`type`.{BasicSqlType, IntervalSqlType, SqlTypeFamily, SqlTypeName} -import org.apache.calcite.sql.fun.SqlDatetimeSubtractionOperator -import org.apache.calcite.sql.fun.SqlDateTimeDivisionOperator +import org.apache.calcite.sql.fun.{SqlDateTimeDivisionOperator, SqlDatetimeSubtractionOperator} import org.apache.kylin.common.util.DateFormat import org.apache.spark.sql.KapFunctions._ import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DataTypes, DateType, LongType, TimestampType} -import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.util.SparderTypeUtil +import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.unsafe.types.UTF8String import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer - /** - * Convert RexNode to a nested Column - * - * @param inputFieldNames fieldNames - * @param rowType rowtyple - * @param dataContext context - */ -class SparderRexVisitor(val inputFieldNames: Array[String], +/** + * Convert RexNode to a nested Column + * + * @param inputFieldNames fieldNames + * @param rowType rowtyple + * @param dataContext context + */ +class SparderRexVisitor(val inputFieldNames: Seq[String], val rowType: RelDataType, val dataContext: DataContext) - extends RexVisitorImpl[Any](true) { + extends RexVisitorImpl[Any](true) { def this(dfs: Array[DataFrame], rowType: RelDataType, @@ -62,6 +63,10 @@ class SparderRexVisitor(val inputFieldNames: Array[String], rowType: RelDataType, dataContext: DataContext) = this(Array(df), rowType, dataContext) + def this(plan: LogicalPlan, + rowType: RelDataType, + dataContext: DataContext) = this(plan.output.map(c => c.name), rowType, dataContext) + // scalastyle:off override def visitCall(call: RexCall): Any = { @@ -297,11 +302,11 @@ class SparderRexVisitor(val inputFieldNames: Array[String], val v = convertFilterValueAfterAggr(literal) v match { case Some(toReturn) => toReturn - case None => null + case None => null } } - case class MonthNum(num: Column) + case class MonthNum(num: Column) // as underlying schema types for cuboid table are all "string", // we rely spark to convert the cuboid col data from string to real type to finish comparing @@ -313,12 +318,12 @@ class SparderRexVisitor(val inputFieldNames: Array[String], literal.getType match { case t: IntervalSqlType => { if (Seq("MONTH", "YEAR", "QUARTER").contains( - t.getIntervalQualifier.timeUnitRange.name)) { + t.getIntervalQualifier.timeUnitRange.name)) { return Some( MonthNum(k_lit(literal.getValue.asInstanceOf[BigDecimal].intValue))) } if (literal.getType.getFamily - .asInstanceOf[SqlTypeFamily] == SqlTypeFamily.INTERVAL_DAY_TIME) { + .asInstanceOf[SqlTypeFamily] == SqlTypeFamily.INTERVAL_DAY_TIME) { return Some( SparderTypeUtil.toSparkTimestamp( new java.math.BigDecimal(literal.getValue.toString).longValue())) diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java index 2d5d510894..b045f700f0 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java @@ -26,6 +26,7 @@ import org.apache.kylin.common.KapConfig; import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.QueryTrace; import org.apache.kylin.common.exception.DryRunSucceedException; +import org.apache.kylin.guava30.shaded.common.collect.ImmutableList; import org.apache.kylin.query.engine.exec.ExecuteResult; import org.apache.kylin.query.engine.exec.sparder.QueryEngine; import org.apache.kylin.query.mask.QueryResultMasks; @@ -35,8 +36,6 @@ import org.apache.spark.sql.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.kylin.guava30.shaded.common.collect.ImmutableList; - public class SparkEngine implements QueryEngine { private static final Logger log = LoggerFactory.getLogger(SparkEngine.class); @@ -50,10 +49,11 @@ public class SparkEngine implements QueryEngine { } finally { calciteToSparkPlaner.cleanCache(); } - long takeTime = System.currentTimeMillis() - start; + Dataset<Row> df = calciteToSparkPlaner.getResult(); QueryContext.current().record("to_spark_plan"); + long takeTime = System.currentTimeMillis() - start; log.info("Plan take {} ms", takeTime); - return calciteToSparkPlaner.getResult(); + return df; } @Override diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/AggregatePlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/AggregatePlan.scala index a44b338cbb..de91678398 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/AggregatePlan.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/AggregatePlan.scala @@ -17,6 +17,8 @@ */ package org.apache.kylin.query.runtime.plan +import java.util.Locale + import org.apache.calcite.rel.core.AggregateCall import org.apache.calcite.rex.RexLiteral import org.apache.calcite.sql.SqlKind @@ -26,19 +28,18 @@ import org.apache.kylin.measure.percentile.PercentileCounter import org.apache.kylin.metadata.model.FunctionDesc import org.apache.kylin.query.relnode.{KapAggregateRel, KapProjectRel, KylinAggregateCall, OLAPAggregateRel} import org.apache.kylin.query.util.RuntimeHelper -import org.apache.spark.sql.KapFunctions._ +import org.apache.spark.sql.KapFunctions.{k_lit, sum0} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile -import org.apache.spark.sql.catalyst.expressions.{CreateArray, In} -import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.catalyst.expressions.{Attribute, CreateArray, In} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.util.ArrayData import org.apache.spark.sql.execution.utils.SchemaProcessor import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.{ArrayType, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.sql.udaf.SingleValueAgg import org.apache.spark.sql.util.SparderTypeUtil -import java.util.Locale import scala.collection.JavaConverters._ // scalastyle:off @@ -47,12 +48,12 @@ object AggregatePlan extends LogEx { List("PERCENTILE", "PERCENTILE_APPROX", "INTERSECT_COUNT", "COUNT_DISTINCT", "BITMAP_UUID", FunctionDesc.FUNC_BITMAP_BUILD, FunctionDesc.FUNC_SUM_LC) - def agg(inputs: java.util.List[DataFrame], - rel: KapAggregateRel): DataFrame = logTime("aggregate", debug = true) { + def agg(plan: LogicalPlan, + rel: KapAggregateRel): LogicalPlan = logTime("aggregate", debug = true) { + + val schemaNames = plan.output - var dataFrame = inputs.get(0) - val schemaNames = dataFrame.schema.fieldNames - val groupList = rel.getRewriteGroupKeys.asScala.map(groupId => col(schemaNames.apply(groupId))).toList + val groupList = rel.getRewriteGroupKeys.asScala.map(groupId => col(schemaNames.apply(groupId).name)).toList if (rel.getContext != null && rel.getContext.isExactlyAggregate && !rel.getContext.isNeedToManyDerived) { // exactly match, skip agg, direct project. @@ -60,8 +61,8 @@ object AggregatePlan extends LogEx { case (call: KylinAggregateCall, index: Int) => val funcName = OLAPAggregateRel.getAggrFuncName(call); val dataType = call.getFunc.getReturnDataType - val argNames = call.getArgList.asScala.map(dataFrame.schema.names.apply(_)) - val columnName = argNames.map(col) + val argNames = call.getArgList.asScala.map(schemaNames.apply(_).name) + val columnName = argNames.map(name => col(name)) val hash = System.identityHashCode(rel).toString funcName match { case FunctionDesc.FUNC_COUNT_DISTINCT => @@ -70,7 +71,7 @@ object AggregatePlan extends LogEx { KapFunctions.approx_count_distinct_decode(columnName.head, dataType.getPrecision).alias(aggName) } else if (call.isBitmapCountDistinctFunc) { if (rel.getContext.isExactlyFastBitmap) { - col(schemaNames.apply(call.getArgList.get(0))) + col(schemaNames.apply(call.getArgList.get(0)).name) } else { val aggName = SchemaProcessor.replaceToAggravateSchemaName(index, "PRECISE_COUNT_DISTINCT_DECODE", hash, argNames: _*) KapFunctions.precise_count_distinct_decode(columnName.head).alias(aggName) @@ -90,32 +91,33 @@ object AggregatePlan extends LogEx { val sparkDataType = SparderTypeUtil.toSparkType(dataType) KapFunctions.k_sum_lc_decode(columnName.head, sparkDataType.json).alias(aggName) case _ => - col(schemaNames.apply(call.getArgList.get(0))) + col(schemaNames.apply(call.getArgList.get(0)).name) } case (call: Any, _: Int) => - col(schemaNames.apply(call.getArgList.get(0))) + col(schemaNames.apply(call.getArgList.get(0)).name) }.toList val prjList = groupList ++ aggCols logInfo(s"Query exactly match index, skip agg, project $prjList.") - dataFrame.select(prjList: _*) + SparkOperation.project(prjList, plan) } else { - dataFrame = genFiltersWhenIntersectCount(rel, dataFrame) - val aggList = buildAgg(dataFrame.schema, rel) + val intersectplan = genFiltersWhenIntersectCount(rel, plan) + val aggList = buildAgg(intersectplan.output, rel, plan) val groupSets = rel.getRewriteGroupSets.asScala - .map(groupSet => groupSet.asScala.map(groupId => col(schemaNames.apply(groupId))).toList).toList - SparkOperation.agg(AggArgc(dataFrame, groupList, aggList, groupSets, rel.isSimpleGroupType)) + .map(groupSet => groupSet.asScala.map(groupId => col(schemaNames.apply(groupId).name)).toList).toList + SparkOperation.agg(AggArgc(intersectplan, groupList, aggList, groupSets, rel.isSimpleGroupType)) } } - private def genFiltersWhenIntersectCount(rel: KapAggregateRel, dataFrame: DataFrame): DataFrame = { + private def genFiltersWhenIntersectCount(rel: KapAggregateRel, plan: LogicalPlan): LogicalPlan = { try { + val names = plan.output + val intersects = rel.getRewriteAggCalls.asScala.filter(_.isInstanceOf[KylinAggregateCall]) .filter(!_.asInstanceOf[KylinAggregateCall].getFunc.isCount) .map(_.asInstanceOf[KylinAggregateCall]) .filter(call => !call.getFunc.isCount && OLAPAggregateRel.getAggrFuncName(call).equals(FunctionDesc.FUNC_INTERSECT_COUNT)) - val names = dataFrame.schema.names - val children = dataFrame.queryExecution.logical + val children = plan if (intersects.nonEmpty && intersects.size == rel.getRewriteAggCalls.size() && children.isInstanceOf[Project]) { // only exists intersect count function in agg val list = children.asInstanceOf[Project].projectList @@ -128,24 +130,28 @@ object AggregatePlan extends LogEx { val filters = intersects.map { call => val filterColumnIndex = call.getArgList.get(1) val litIndex = call.getArgList.get(2) - new Column(In(col(names(filterColumnIndex)).expr, list.apply(litIndex).children.head.asInstanceOf[CreateArray].children)) + new Column(In(col(names(filterColumnIndex).name).expr, list.apply(litIndex).children.head.asInstanceOf[CreateArray].children)) } val column = filters.reduceLeft(_.or(_)) - dataFrame.filter(column) + + val filterPlan = Filter(column.expr, plan) + SparkOperation.project(plan.output.map(c => col(c.name)), filterPlan) } else { - dataFrame + plan } } else { - dataFrame + plan } } catch { case e: Throwable => logWarning("Error occurred when generate filters", e) - dataFrame + plan } } - def buildAgg(schema: StructType, - rel: KapAggregateRel): List[Column] = { + // S53 Fix https://olapio.atlassian.net/browse/KE-42473 + def buildAgg(schema: Seq[Attribute], + rel: KapAggregateRel, + plan: LogicalPlan): List[Column] = { val hash = System.identityHashCode(rel).toString rel.getRewriteAggCalls.asScala.zipWithIndex.map { @@ -155,8 +161,8 @@ object AggregatePlan extends LogEx { val isCount = call.getFunc.isCount val funcName = if (isCount) FunctionDesc.FUNC_COUNT else OLAPAggregateRel.getAggrFuncName(call) - val argNames = call.getArgList.asScala.map(schema.names.apply(_)) - val columnName = argNames.map(col) + val argNames = call.getArgList.asScala.map(schema.apply(_).name) + val columnName = argNames.map(name => col(name)) val registeredFuncName = RuntimeHelper.registerSingleByColName(funcName, dataType) val aggName = SchemaProcessor.replaceToAggravateSchemaName(index, funcName, hash, argNames: _*) if (funcName == FunctionDesc.FUNC_COUNT_DISTINCT) { @@ -173,8 +179,9 @@ object AggregatePlan extends LogEx { KapFunctions.precise_bitmap_build(columnName.head).alias(aggName) } else if (funcName.equalsIgnoreCase(FunctionDesc.FUNC_INTERSECT_COUNT)) { require(columnName.size >= 3, s"Input columns size ${columnName.size} don't greater than or equal to 3.") + val resolvedPlan = SparkInternalAgent.getDataFrame(SparderEnv.getSparkSession, plan) val columns = columnName.slice(0, 3).zipWithIndex.map { - case (column: Column, 2) => column.cast(ArrayType.apply(schema.fields.apply(call.getArgList.get(1)).dataType)) + case (column: Column, 2) => column.cast(ArrayType.apply(resolvedPlan.schema.apply(call.getArgList.get(1)).dataType)) case (column: Column, _) => column } val separator = s"\\${KylinConfig.getInstanceFromEnv.getIntersectFilterOrSeparator}" @@ -198,9 +205,8 @@ object AggregatePlan extends LogEx { } case (call: Any, index: Int) => val funcName = OLAPAggregateRel.getAggrFuncName(call) - val schemaNames = schema.names - val argNames = call.getArgList.asScala.map(id => schemaNames.apply(id)) - val columnName = argNames.map(col) + val argNames = call.getArgList.asScala.map(id => schema.apply(id).name) + val columnName = argNames.map(name => col(name)) val inputType = call.getType val aggName = SchemaProcessor.replaceToAggravateSchemaName(index, funcName, @@ -211,7 +217,11 @@ object AggregatePlan extends LogEx { rel.getInput match { case projectRel: KapProjectRel => val percentageArg = projectRel.getChildExps.get(call.getArgList.get(1)) - val accuracyArg = if (call.getArgList.size() < 3) { None } else { Some(projectRel.getChildExps.get(call.getArgList.get(2))) } + val accuracyArg = if (call.getArgList.size() < 3) { + None + } else { + Some(projectRel.getChildExps.get(call.getArgList.get(2))) + } (percentageArg, accuracyArg) match { case (percentageLitRex: RexLiteral, accuracyArgLitRex: Option[RexLiteral]) => if (KylinConfig.getInstanceFromEnv.getPercentileApproxAlgorithm.equalsIgnoreCase("t-digest")) { @@ -260,7 +270,8 @@ object AggregatePlan extends LogEx { KapFunctions.precise_bitmap_build_pushdown(columnName.head).alias(aggName) // Issue 4337: Supported select (select '2012-01-02') as data, xxx from table group by xxx case SqlKind.SINGLE_VALUE.sql => - SingleValueAgg(schema.head).apply(col(argNames.head)).alias(aggName) + val structField = StructField(schema.head.name, SparderTypeUtil.convertSqlTypeToSparkType(inputType), true, Metadata.empty) + SingleValueAgg(structField).apply(col(argNames.head)).alias(aggName) case FunctionDesc.FUNC_GROUPING => if (!rel.isSimpleGroupType) { grouping(argNames.head).alias(aggName) diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/FilterPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/FilterPlan.scala index 727e8bd2ab..494f390796 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/FilterPlan.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/FilterPlan.scala @@ -17,22 +17,23 @@ */ package org.apache.kylin.query.runtime.plan -import org.apache.kylin.engine.spark.utils.LogEx import org.apache.calcite.DataContext +import org.apache.kylin.engine.spark.utils.LogEx import org.apache.kylin.query.relnode.KapFilterRel import org.apache.kylin.query.runtime.SparderRexVisitor -import org.apache.spark.sql.{Column, DataFrame} +import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} object FilterPlan extends LogEx { - def filter( - inputs: java.util.List[DataFrame], - rel: KapFilterRel, - dataContext: DataContext): DataFrame = logTime("filter", debug = true) { - val df = inputs.get(0) - val visitor = new SparderRexVisitor(df, - rel.getInput.getRowType, - dataContext) - val filterColumn = rel.getCondition.accept(visitor).asInstanceOf[Column] - df.filter(filterColumn) + def filter(plan: LogicalPlan, + rel: KapFilterRel, + dataContext: DataContext): LogicalPlan = logTime("filter", debug = true) { + + val visitor = new SparderRexVisitor(plan.output.map(_.name), + rel.getInput.getRowType, + dataContext) + val filterColumn = rel.getCondition.accept(visitor).asInstanceOf[Column] + + Filter(filterColumn.expr, plan) } } diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/JoinPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/JoinPlan.scala index 0a67247f5d..a43ad2b954 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/JoinPlan.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/JoinPlan.scala @@ -18,30 +18,32 @@ package org.apache.kylin.query.runtime.plan import java.util + import org.apache.calcite.DataContext import org.apache.calcite.rex.RexCall +import org.apache.kylin.engine.spark.utils.LogEx import org.apache.kylin.query.relnode.{KapJoinRel, KapNonEquiJoinRel} import org.apache.kylin.query.runtime.SparderRexVisitor import org.apache.kylin.query.util.KapRelUtil -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.{Column, DataFrame} +import org.apache.spark.sql.catalyst.plans.logical.{Join, JoinHint, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.{Cross, JoinType} +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.{Column, SparkOperation} import scala.collection.JavaConverters._ -object JoinPlan { - def nonEquiJoin(inputs: java.util.List[DataFrame], - rel: KapNonEquiJoinRel, dataContext: DataContext): DataFrame = { - val lDataFrame = inputs.get(0) - val rDataFrame = inputs.get(1) - val lSchemaNames = lDataFrame.schema.fieldNames.map("l_" + _) - val rSchemaNames = rDataFrame.schema.fieldNames.map("r_" + _) - // val schema = statefulDF.indexSchema - val newLDataFrame = inputs.get(0).toDF(lSchemaNames: _*) - val newRDataFrame = inputs.get(1).toDF(rSchemaNames: _*) +object JoinPlan extends LogEx { + def nonEquiJoin(plans: Seq[LogicalPlan], + rel: KapNonEquiJoinRel, dataContext: DataContext): LogicalPlan = { + var lPlan = plans.apply(0) + var rPlan = plans.apply(1) + + lPlan = SparkOperation.project(lPlan.output.map(c => col(c.name).alias("l_" + c.name)), lPlan) + rPlan = SparkOperation.project(rPlan.output.map(c => col(c.name).alias("r_" + c.name)), rPlan) // slice lSchemaNames with rel.getLeftInputSizeBeforeRewrite // to strip off the fields added during rewrite // as those field will disturb the original index based join condition - val visitor = new SparderRexVisitor(Array(lSchemaNames.slice(0, rel.getLeftInputSizeBeforeRewrite), rSchemaNames).flatten, + val visitor = new SparderRexVisitor(Seq(lPlan.output.map(c => c.name).slice(0, rel.getLeftInputSizeBeforeRewrite), rPlan.output.map(c => c.name)).flatten, null, dataContext) val pairs = new util.ArrayList[org.apache.kylin.common.util.Pair[Integer, Integer]]() @@ -62,31 +64,31 @@ object JoinPlan { equalCond = equalCond.and(actRemaining.accept(visitor).asInstanceOf[Column]) } - newLDataFrame.join(newRDataFrame, equalCond, rel.getJoinType.lowerName) + Join(lPlan, rPlan, joinType = JoinType(rel.getJoinType.lowerName), Some(equalCond.expr), JoinHint.NONE) } else { val conditionExprCol = rel.getCondition.accept(visitor).asInstanceOf[Column] - newLDataFrame.join(newRDataFrame, conditionExprCol, rel.getJoinType.lowerName) + Join(lPlan, rPlan, joinType = JoinType(rel.getJoinType.lowerName), Some(conditionExprCol.expr), JoinHint.NONE) } } - def join(inputs: java.util.List[DataFrame], - rel: KapJoinRel): DataFrame = { + // scalastyle:off + def join(plans: Seq[LogicalPlan], + rel: KapJoinRel): LogicalPlan = { + + var lPlan = plans.apply(0) + var rPlan = plans.apply(1) + + lPlan = SparkOperation.project(lPlan.output.map(c => col(c.name).alias("l_" + c.name)), lPlan) + rPlan = SparkOperation.project(rPlan.output.map(c => col(c.name).alias("r_" + c.name)), rPlan) - val lDataFrame = inputs.get(0) - val rDataFrame = inputs.get(1) - val lSchemaNames = lDataFrame.schema.fieldNames.map("l_" + _) - val rSchemaNames = rDataFrame.schema.fieldNames.map("r_" + _) - // val schema = statefulDF.indexSchema - val newLDataFrame = inputs.get(0).toDF(lSchemaNames: _*) - val newRDataFrame = inputs.get(1).toDF(rSchemaNames: _*) var joinCol: Column = null // todo utils rel.getLeftKeys.asScala .zip(rel.getRightKeys.asScala) .foreach(tuple => { - val col1 = col(lSchemaNames.apply(tuple._1)) - val col2 = col(rSchemaNames.apply(tuple._2)) + val col1 = col(lPlan.output.apply(tuple._1).name) + val col2 = col(rPlan.output.apply(tuple._2).name) val equalCond = makeEqualCond(col1, col2, rel.isJoinCondEqualNullSafe) if (joinCol == null) { @@ -96,9 +98,9 @@ object JoinPlan { } }) if (joinCol == null) { - newLDataFrame.crossJoin(newRDataFrame) + Join(lPlan, rPlan, joinType = Cross, None, JoinHint.NONE) } else { - newLDataFrame.join(newRDataFrame, joinCol, rel.getJoinType.lowerName) + Join(lPlan, rPlan, joinType = JoinType(rel.getJoinType.lowerName), Some(joinCol.expr), JoinHint.NONE) } } diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/LimitPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/LimitPlan.scala index 8669f4c8f6..385b4cec28 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/LimitPlan.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/LimitPlan.scala @@ -20,27 +20,24 @@ package org.apache.kylin.query.runtime.plan import org.apache.calcite.DataContext import org.apache.kylin.query.relnode.KapLimitRel import org.apache.kylin.query.runtime.SparderRexVisitor -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.plans.logical.{Limit, LogicalPlan, Offset} -// scalastyle:off object LimitPlan { - def limit(inputs: java.util.List[DataFrame], + def limit(plan: LogicalPlan, rel: KapLimitRel, - dataContext: DataContext): DataFrame = { - // val schema = statefulDF.indexSchema - val visitor = new SparderRexVisitor(inputs.get(0), + dataContext: DataContext): LogicalPlan = { + val visitor = new SparderRexVisitor(plan, rel.getInput.getRowType, dataContext) val limit = BigDecimal(rel.localFetch.accept(visitor).toString).toInt - if (rel.localOffset == null) { - inputs - .get(0) - .limit(limit) - } else { + + if (rel.localOffset != null) { val offset = BigDecimal(rel.localOffset.accept(visitor).toString).toInt - inputs - .get(0) - .offset(offset).limit(limit) + val offsetPlan = Offset(Literal(offset), plan) + Limit(Literal(limit), offsetPlan) + } else { + Limit(Literal(limit), plan) } } } diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/MinusPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/MinusPlan.scala index 265c59bea2..e873a03f79 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/MinusPlan.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/MinusPlan.scala @@ -19,18 +19,16 @@ package org.apache.kylin.query.runtime.plan import org.apache.calcite.DataContext import org.apache.kylin.query.relnode.KapMinusRel -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.catalyst.plans.logical.{Except, LogicalPlan} object MinusPlan { - - def minus( - inputs: Seq[DataFrame], - rel: KapMinusRel, - dataContext: DataContext): DataFrame = { + def minus(plans: Seq[LogicalPlan], + rel: KapMinusRel, + dataContext: DataContext): LogicalPlan = { if (rel.all) { - inputs.reduce((set1, set2) => set1.exceptAll(set2)) + plans.reduce((p1, p2) => Except(p1, p2, true)) } else { - inputs.reduce((set1, set2) => set1.except(set2)) + plans.reduce((p1, p2) => Except(p1, p2, false)) } } } diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ProjectPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ProjectPlan.scala index 8f25c2312c..76c261a513 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ProjectPlan.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ProjectPlan.scala @@ -17,26 +17,27 @@ */ package org.apache.kylin.query.runtime.plan -import org.apache.kylin.engine.spark.utils.LogEx import org.apache.calcite.DataContext import org.apache.calcite.rex.RexInputRef +import org.apache.kylin.engine.spark.utils.LogEx import org.apache.kylin.query.relnode.KapProjectRel import org.apache.kylin.query.runtime.SparderRexVisitor -import org.apache.spark.sql.{Column, DataFrame} -import org.apache.spark.sql.KapFunctions._ +import org.apache.spark.sql.KapFunctions.k_lit +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.{Column, SparkOperation} import scala.collection.JavaConverters._ - object ProjectPlan extends LogEx { - def select(inputs: java.util.List[DataFrame], + + def select(plan: LogicalPlan, rel: KapProjectRel, - dataContext: DataContext): DataFrame = logTime("project", debug = true) { - val df = inputs.get(0) + dataContext: DataContext): LogicalPlan = { val duplicatedColumnsCount = collection.mutable.Map[Column, Int]() + val selectedColumns = rel.rewriteProjects.asScala .map(rex => { - val visitor = new SparderRexVisitor(df, + val visitor = new SparderRexVisitor(plan, rel.getInput.getRowType, dataContext) (rex.accept(visitor), rex.isInstanceOf[RexInputRef]) @@ -62,6 +63,6 @@ object ProjectPlan extends LogEx { } }) - df.select(selectedColumns: _*) + SparkOperation.project(selectedColumns, plan) } } diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala index 3f23ab26ac..fb7167f03e 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala @@ -23,7 +23,6 @@ import java.nio.charset.StandardCharsets import java.util.concurrent.atomic.AtomicLong import java.{lang, util} -import io.kyligence.kap.secondstorage.SecondStorageUtil import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} import org.apache.commons.io.IOUtils import org.apache.hadoop.fs.Path @@ -50,6 +49,8 @@ import scala.collection.JavaConverters._ import scala.collection.convert.ImplicitConversions.`iterator asScala` import scala.collection.mutable +import io.kyligence.kap.secondstorage.SecondStorageUtil + // scalastyle:off object ResultType extends Enumeration { type ResultType = Value diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/SortPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/SortPlan.scala index 9d8c87db2b..bab7c16b60 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/SortPlan.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/SortPlan.scala @@ -17,27 +17,25 @@ */ package org.apache.kylin.query.runtime.plan -import org.apache.kylin.engine.spark.utils.LogEx import org.apache.calcite.DataContext +import org.apache.kylin.engine.spark.utils.LogEx import org.apache.kylin.query.relnode.KapSortRel import org.apache.kylin.query.runtime.SparderRexVisitor -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.KapFunctions._ +import org.apache.spark.sql.KapFunctions.k_lit +import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, NullsFirst, NullsLast, SortOrder} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort} import scala.collection.JavaConverters._ - object SortPlan extends LogEx { - def sort(inputs: java.util.List[DataFrame], + def sort(plan: LogicalPlan, rel: KapSortRel, - dataContext: DataContext): DataFrame = logTime("sort", debug = true) { - - val dataFrame = inputs.get(0) + dataContext: DataContext): LogicalPlan = logTime("sort", debug = true) { val columns = rel.getChildExps.asScala .map(rex => { - val visitor = new SparderRexVisitor(dataFrame, - rel.getInput.getRowType, + val visitor = new SparderRexVisitor(plan.output.map(_.name), + rel.getInput.getRowType, dataContext) rex.accept(visitor) }) @@ -47,33 +45,33 @@ object SortPlan extends LogEx { val collation = rel.collation.getFieldCollations.get(pair._2) /** From Calcite: org.apache.calcite.rel.RelFieldCollation.Direction#defaultNullDirection - * Returns the null direction if not specified. Consistent with Oracle, - * NULLS are sorted as if they were positive infinity. */ + * Returns the null direction if not specified. Consistent with Oracle, + * NULLS are sorted as if they were positive infinity. */ (collation.direction, collation.nullDirection) match { case ( - org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING, - org.apache.calcite.rel.RelFieldCollation.NullDirection.UNSPECIFIED) => - pair._1.asc_nulls_last + org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING, + org.apache.calcite.rel.RelFieldCollation.NullDirection.UNSPECIFIED) => + SortOrder(pair._1.expr, Ascending, NullsLast, Seq.empty) case (org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING, - org.apache.calcite.rel.RelFieldCollation.NullDirection.LAST) => - pair._1.asc_nulls_last + org.apache.calcite.rel.RelFieldCollation.NullDirection.LAST) => + SortOrder(pair._1.expr, Ascending, NullsLast, Seq.empty) case (org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING, - org.apache.calcite.rel.RelFieldCollation.NullDirection.FIRST) => - pair._1.asc_nulls_first + org.apache.calcite.rel.RelFieldCollation.NullDirection.FIRST) => + SortOrder(pair._1.expr, Ascending, NullsFirst, Seq.empty) case ( - org.apache.calcite.rel.RelFieldCollation.Direction.DESCENDING, - org.apache.calcite.rel.RelFieldCollation.NullDirection.UNSPECIFIED) => - pair._1.desc_nulls_first + org.apache.calcite.rel.RelFieldCollation.Direction.DESCENDING, + org.apache.calcite.rel.RelFieldCollation.NullDirection.UNSPECIFIED) => + SortOrder(pair._1.expr, Descending, NullsFirst, Seq.empty) case (org.apache.calcite.rel.RelFieldCollation.Direction.DESCENDING, - org.apache.calcite.rel.RelFieldCollation.NullDirection.LAST) => - pair._1.desc_nulls_last + org.apache.calcite.rel.RelFieldCollation.NullDirection.LAST) => + SortOrder(pair._1.expr, Descending, NullsLast, Seq.empty) case (org.apache.calcite.rel.RelFieldCollation.Direction.DESCENDING, - org.apache.calcite.rel.RelFieldCollation.NullDirection.FIRST) => - pair._1.desc_nulls_first + org.apache.calcite.rel.RelFieldCollation.NullDirection.FIRST) => + SortOrder(pair._1.expr, Descending, NullsFirst, Seq.empty) case _ => throw new IllegalArgumentException } }) - inputs.get(0).sort(columns: _*) + Sort(columns, true, plan) } } diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala index 5c0b0511b7..4a06499092 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala @@ -33,12 +33,13 @@ import org.apache.kylin.metadata.tuple.TupleInfo import org.apache.kylin.query.implicits.sessionToQueryContext import org.apache.kylin.query.relnode.{KapRel, OLAPContext} import org.apache.kylin.query.util.{RuntimeHelper, SparderDerivedUtil} -import org.apache.spark.sql.{Column, DataFrame, Row, SparderEnv, SparkOperation, SparkSession} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.execution.utils.SchemaProcessor import org.apache.spark.sql.functions.col import org.apache.spark.sql.manager.SparderLookupManager import org.apache.spark.sql.types.{ArrayType, DataTypes, DoubleType, StringType, StructField, StructType} import org.apache.spark.sql.util.SparderTypeUtil +import org.apache.spark.sql.{Column, DataFrame, Row, SparderEnv, SparkInternalAgent, SparkOperation, SparkSession} import scala.collection.JavaConverters._ @@ -52,20 +53,20 @@ object TableScanPlan extends LogEx { r } - private[runtime] val cacheDf: ThreadLocal[ConcurrentHashMap[String, DataFrame]] = new ThreadLocal[ConcurrentHashMap[String, DataFrame]] { - override def initialValue: ConcurrentHashMap[String, DataFrame] = { - new ConcurrentHashMap[String, DataFrame] + private[runtime] val cachePlan: ThreadLocal[ConcurrentHashMap[String, LogicalPlan]] = new ThreadLocal[ConcurrentHashMap[String, LogicalPlan]] { + override def initialValue: ConcurrentHashMap[String, LogicalPlan] = { + new ConcurrentHashMap[String, LogicalPlan] } } - def createOLAPTable(rel: KapRel): DataFrame = logTime("table scan", debug = true) { + def createOLAPTable(rel: KapRel): LogicalPlan = logTime("table scan", debug = true) { val session: SparkSession = SparderEnv.getSparkSession val olapContext = rel.getContext val context = olapContext.storageContext val prunedSegments = context.getPrunedSegments val prunedStreamingSegments = context.getPrunedStreamingSegments val realizations = olapContext.realization.getRealizations.asScala.toList - realizations.map(_.asInstanceOf[NDataflow]) + val plans = realizations.map(_.asInstanceOf[NDataflow]) .filter(dataflow => (!dataflow.isStreaming && !context.isBatchCandidateEmpty) || (dataflow.isStreaming && !context.isStreamCandidateEmpty) || isSegmentsEmpty(prunedSegments, prunedStreamingSegments)) @@ -75,10 +76,14 @@ object TableScanPlan extends LogEx { } else { tableScan(rel, dataflow, olapContext, session, prunedSegments, context.getCandidate) } - }).reduce(_.union(_)) + }) + + // The reason why we use Project to package it here is because the output method of Union needs to be analyzed, + // so it needs to be packaged by Project so that subsequent operators can easily obtain the output information of nodes. + Project(plans.head.output, Union(plans)) } - def createMetadataTable(rel: KapRel): DataFrame = { + def createMetadataTable(rel: KapRel): LogicalPlan = { val session: SparkSession = SparderEnv.getSparkSession val olapContext = rel.getContext val allFields: util.List[TblColRef] = new util.ArrayList[TblColRef] @@ -122,13 +127,14 @@ object TableScanPlan extends LogEx { }) val schema: StructType = StructType(structTypes) - session.createDataFrame(result, schema) + session.createDataFrame(result, schema).queryExecution.logical } // prunedSegments is null - private def tableScanEmptySegment(rel: KapRel): DataFrame = { + private def tableScanEmptySegment(rel: KapRel): LogicalPlan = { logInfo("prunedSegments is null") - val df = SparkOperation.createEmptyDataFrame( + // KE-41874 DataFrame convert Logical Plan + SparkOperation.createEmptyDataFrame( StructType(rel.getColumnRowType .getAllColumns.asScala .map(column => StructField( @@ -136,11 +142,7 @@ object TableScanPlan extends LogEx { SparderTypeUtil.toSparkType(column.getType)) ) ) - ) - val cols = df.schema.map(structField => { - col(structField.name) - }) - df.select(cols: _*) + ).queryExecution.logical } def isSegmentsEmpty(prunedSegments: util.List[NDataSegment], prunedStreamingSegments: util.List[NDataSegment]): Boolean = { @@ -151,7 +153,7 @@ object TableScanPlan extends LogEx { def tableScan(rel: KapRel, dataflow: NDataflow, olapContext: OLAPContext, session: SparkSession, prunedSegments: util.List[NDataSegment], - candidate: NLayoutCandidate): DataFrame = { + candidate: NLayoutCandidate): LogicalPlan = { val prunedPartitionMap = olapContext.storageContext.getPrunedPartitions olapContext.resetSQLDigest() //TODO: refactor @@ -175,31 +177,31 @@ object TableScanPlan extends LogEx { val path = fileList.mkString(",") + olapContext.isExactlyFastBitmap printLogInfo(basePath, dataflow.getId, cuboidLayout.getId, prunedSegments, prunedPartitionMap) - val cached = cacheDf.get().getOrDefault(path, null) - val df = if (cached != null && !cached.sparkSession.sparkContext.isStopped) { - logInfo(s"Reuse df: ${cuboidLayout.getId}") + val pruningInfo = prunedSegments.asScala.map { seg => + if (prunedPartitionMap != null) { + val partitions = prunedPartitionMap.get(seg.getId) + seg.getId + ":" + Joiner.on("|").join(partitions) + } else { + seg.getId + } + }.mkString(",") + + val cached = cachePlan.get().getOrDefault(path, null) + var plan = if (cached != null && !SparderEnv.getSparkSession.sparkContext.isStopped) { + logInfo(s"Reuse plan: ${cuboidLayout.getId}") cached } else { - val pruningInfo = prunedSegments.asScala.map { seg => - if (prunedPartitionMap != null) { - val partitions = prunedPartitionMap.get(seg.getId) - seg.getId + ":" + Joiner.on("|").join(partitions) - } else { - seg.getId - } - }.mkString(",") - val newDf = session.kylin + val newPlan = session.kylin .isFastBitmapEnabled(olapContext.isExactlyFastBitmap) .bucketingEnabled(bucketEnabled(olapContext, cuboidLayout)) .cuboidTable(dataflow, cuboidLayout, pruningInfo) - .toDF(columnNames: _*) - logInfo(s"Cache df: ${cuboidLayout.getId}") - cacheDf.get().put(path, newDf) - newDf + cachePlan.get().put(path, newPlan) + newPlan } - val (schema, newDF) = buildSchema(df, tableName, cuboidLayout, rel, olapContext, dataflow) - newDF.select(schema: _*) + plan = SparkOperation.projectAsAlias(columnNames, plan) + val (schema, newPlan) = buildSchema(plan, tableName, cuboidLayout, rel, olapContext, dataflow) + SparkOperation.project(schema, newPlan) } def bucketEnabled(context: OLAPContext, layout: LayoutEntity): Boolean = { @@ -217,9 +219,9 @@ object TableScanPlan extends LogEx { && context.getOuterJoinParticipants.iterator().next() == layout.getShardByColumnRefs.get(0)) } - def buildSchema(df: DataFrame, tableName: String, cuboidLayout: LayoutEntity, rel: KapRel, - olapContext: OLAPContext, dataflow: NDataflow): (Seq[Column], DataFrame) = { - var newDF = df + def buildSchema(plan: LogicalPlan, tableName: String, cuboidLayout: LayoutEntity, rel: KapRel, + olapContext: OLAPContext, dataflow: NDataflow): (Seq[Column], LogicalPlan) = { + var newPlan = plan val isBatchOfHybrid = olapContext.realization.isInstanceOf[HybridRealization] && dataflow.getModel.isFusionModel && !dataflow.isStreaming val mapping = new NLayoutToGridTableMapping(cuboidLayout, isBatchOfHybrid) val context = olapContext.storageContext @@ -257,7 +259,7 @@ object TableScanPlan extends LogEx { olapContext.returnTupleInfo, context.getCandidate) if (derived.hasDerived) { - newDF = derived.joinDerived(newDF) + newPlan = derived.joinDerived(newPlan) } var topNMapping: Map[Int, Column] = Map.empty // query will only has one Top N measure. @@ -266,8 +268,10 @@ object TableScanPlan extends LogEx { } if (topNMetric.isDefined) { val topNFieldIndex = mapping.getMetricsIndices(List(topNMetric.get).asJava).head - val tp = processTopN(topNMetric.get, newDF, topNFieldIndex, olapContext.returnTupleInfo, tableName) - newDF = tp._1 + + val df = SparkInternalAgent.getDataFrame(SparderEnv.getSparkSession, newPlan) + val tp = processTopN(topNMetric.get, df, topNFieldIndex, olapContext.returnTupleInfo, tableName) + newPlan = tp._1.queryExecution.analyzed topNMapping = tp._2 } val tupleIdx = getTupleIdx(dimensionsD, @@ -278,10 +282,9 @@ object TableScanPlan extends LogEx { derived, tableName, rel.getColumnRowType.getAllColumns.asScala.toList, - df.schema, - gtColIdx, - tupleIdx, - topNMapping), newDF) + newPlan, + (gtColIdx, tupleIdx), + topNMapping), newPlan) } def toLayoutPath(dataflow: NDataflow, cuboidId: Long, basePath: String, seg: NDataSegment): String = { @@ -313,7 +316,7 @@ object TableScanPlan extends LogEx { }.mkString(",") logInfo(s"""Path is: {"base":"$basePath","dataflow":"${dataflowId}","segments":{$prunedSegmentInfo},"layout": ${cuboidId}""") } - logInfo(s"size is ${cacheDf.get().size()}") + logInfo(s"size is ${cachePlan.get().size()}") } private def processTopN(topNMetric: FunctionDesc, df: DataFrame, topNFieldIndex: Int, tupleInfo: TupleInfo, tableName: String): (DataFrame, Map[Int, Column]) = { @@ -433,7 +436,7 @@ object TableScanPlan extends LogEx { tupleIdx } - def createLookupTable(rel: KapRel): DataFrame = { + def createLookupTable(rel: KapRel): LogicalPlan = { val start = System.currentTimeMillis() val olapContext = rel.getContext @@ -447,17 +450,18 @@ object TableScanPlan extends LogEx { val snapshotResPath = tableMetadataManager.getTableDesc(lookupTableName).getLastSnapshotPath val config = instance.getConfig val dataFrameTableName = instance.getProject + "@" + lookupTableName - val lookupDf = SparderLookupManager.getOrCreate(dataFrameTableName, snapshotResPath, config) + val lookupPlan = SparderLookupManager.getOrCreate(dataFrameTableName, snapshotResPath, config) val olapTable = olapContext.firstTableScan.getOlapTable val alisTableName = olapContext.firstTableScan.getBackupAlias - val newNames = lookupDf.schema.fieldNames.map { name => - val gTInfoSchema = SchemaProcessor.parseDeriveTableSchemaName(name) - SchemaProcessor.generateDeriveTableSchemaName(alisTableName, + val newNames = lookupPlan.output.map { c => + val gTInfoSchema = SchemaProcessor.parseDeriveTableSchemaName(c.name) + val name = SchemaProcessor.generateDeriveTableSchemaName(alisTableName, gTInfoSchema.columnId, gTInfoSchema.columnName) - }.array - val newNameLookupDf = lookupDf.toDF(newNames: _*) + name + } + val newNameLookupPlan = SparkOperation.projectAsAlias(newNames, lookupPlan) val colIndex = olapTable.getSourceColumns.asScala .map( column => @@ -473,9 +477,9 @@ object TableScanPlan extends LogEx { ) ) }) - val df = newNameLookupDf.select(colIndex: _*) + val plan = SparkOperation.project(colIndex, newNameLookupPlan) logInfo(s"Gen lookup table scan cost Time :${System.currentTimeMillis() - start} ") - df + plan } private def expandDerived(layoutCandidate: NLayoutCandidate, @@ -498,10 +502,10 @@ object TableScanPlan extends LogEx { expanded } - def createSingleRow(): DataFrame = { + def createSingleRow(): LogicalPlan = { val session = SparderEnv.getSparkSession val rows = List.fill(1)(Row.fromSeq(List[Object]())) val rdd = session.sparkContext.makeRDD(rows) - session.createDataFrame(rdd, StructType(List[StructField]())) + session.createDataFrame(rdd, StructType(List[StructField]())).queryExecution.logical } } diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/UnionPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/UnionPlan.scala index 4530c42a26..eaac2027f5 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/UnionPlan.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/UnionPlan.scala @@ -19,27 +19,21 @@ package org.apache.kylin.query.runtime.plan import org.apache.calcite.DataContext import org.apache.kylin.query.relnode.KapUnionRel -import org.apache.spark.sql.DataFrame - -import scala.collection.JavaConverters._ +import org.apache.spark.sql.SparkOperation +import org.apache.spark.sql.catalyst.plans.logical.{Deduplicate, LogicalPlan, Union} +import org.apache.spark.sql.functions.col object UnionPlan { - - def union( - inputs: java.util.List[DataFrame], - rel: KapUnionRel, - dataContext: DataContext): DataFrame = { - var df = inputs.get(0) - val drop = inputs.asScala.drop(1) - if (rel.all) { - for (other <- drop) { - df = df.union(other) - } + def union(plans: Seq[LogicalPlan], + rel: KapUnionRel, + dataContext: DataContext): LogicalPlan = { + val unionPlan = if (rel.all) { + Union(plans) } else { - for (other <- drop) { - df = df.union(other).distinct() - } + val uPlan = Union(plans) + Deduplicate(plans.head.output, uPlan) } - df + + SparkOperation.project(plans.head.output.map(c => col(c.name)), unionPlan) } } diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ValuesPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ValuesPlan.scala index ef9269a88a..53d9a6c8f2 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ValuesPlan.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ValuesPlan.scala @@ -18,27 +18,28 @@ package org.apache.kylin.query.runtime.plan import org.apache.kylin.query.relnode.KapValuesRel -import org.apache.spark.sql.{DataFrame, Row, SparkOperation} +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.util.SparderTypeUtil import scala.collection.JavaConverters._ object ValuesPlan { - def values(rel: KapValuesRel): DataFrame = { + def values(rel: KapValuesRel): LogicalPlan = { val schema = StructType(rel.getRowType.getFieldList.asScala.map { field => StructField( field.getName, SparderTypeUtil.convertSqlTypeToSparkType(field.getType)) }) + val output = schema.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()) + val rows = rel.tuples.asScala.map { tp => Row.fromSeq(tp.asScala.map(lit => SparderTypeUtil.getValueFromRexLit(lit))) - }.asJava - if (rel.tuples.size() == 0) { - SparkOperation.createEmptyDataFrame(schema) - } else { - SparkOperation.createConstantDataFrame(rows, schema) } + + LocalRelation.fromExternalRows(output, rows) } } diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/WindowPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/WindowPlan.scala index 7a7fcb46b8..38581a4963 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/WindowPlan.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/WindowPlan.scala @@ -19,27 +19,29 @@ package org.apache.kylin.query.runtime.plan import java.sql.Date import java.util.{Calendar, Locale} + import org.apache.calcite.DataContext import org.apache.calcite.rel.RelCollationImpl import org.apache.calcite.rel.RelFieldCollation.Direction import org.apache.calcite.rex.RexInputRef import org.apache.calcite.util.NlsString import org.apache.kylin.common.util.DateFormat +import org.apache.kylin.engine.spark.utils.LogEx import org.apache.kylin.query.relnode.{KapProjectRel, KapWindowRel} import org.apache.kylin.query.runtime.SparderRexVisitor -import org.apache.spark.internal.Logging -import org.apache.spark.sql.KapFunctions._ +import org.apache.spark.sql.KapFunctions.k_lit import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.expressions.{Window, WindowSpec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.LongType import org.apache.spark.sql.util.SparderTypeUtil -import org.apache.spark.sql.{Column, DataFrame} +import org.apache.spark.sql.{Column, SparkOperation} import scala.collection.JavaConverters._ -object WindowPlan extends Logging { +object WindowPlan extends LogEx { // the function must have sort val sortSpecified = List("CUME_DIST", "LEAD", "RANK", "DENSE_RANK", "ROW_NUMBER", "NTILE", "LAG") @@ -54,23 +56,23 @@ object WindowPlan extends Logging { "LEAD" ) - def window(input: java.util.List[DataFrame], - rel: KapWindowRel, datacontex: DataContext): DataFrame = { + def window(plan: LogicalPlan, + rel: KapWindowRel, + datacontex: DataContext): LogicalPlan = { val start = System.currentTimeMillis() var windowCount = 0 rel.groups.asScala.head.upperBound - val df = input.get(0) - val columnSize = df.schema.length + val columnSize = plan.output.size - val columns = df.schema.fieldNames.map(col) + val columns = plan.output.map(c => col(c.name)) val constantMap = rel.getConstants.asScala .map(_.getValue) .zipWithIndex .map { entry => (entry._2 + columnSize, entry._1) }.toMap[Int, Any] - val visitor = new SparderRexVisitor(df, + val visitor = new SparderRexVisitor(plan, rel.getInput.getRowType, datacontex) val constants = rel.getConstants.asScala @@ -238,9 +240,9 @@ object WindowPlan extends Logging { } val selectColumn = columns ++ windows - val window = df.select(selectColumn: _*) + val windowPlan = SparkOperation.project(selectColumn, plan) logInfo(s"Gen window cost Time :${System.currentTimeMillis() - start} ") - window + windowPlan } // scalastyle:off @@ -259,7 +261,7 @@ object WindowPlan extends Logging { col.expr.prettyName.toUpperCase(Locale.ROOT) match { case "CURRENT_DATE" => DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN) - .format(DateTimeUtils.currentTimestamp() / 1000) + .format(DateTimeUtils.currentTimestamp() / 1000) case _ => col.expr } } @@ -271,6 +273,7 @@ object WindowPlan extends Logging { } } } + def constantValue(value: Any) = { value match { case v: NlsString => v.getValue diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/util/RuntimeHelper.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/util/RuntimeHelper.scala index 2109b0ecdb..50fd84bb7f 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/util/RuntimeHelper.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/util/RuntimeHelper.scala @@ -26,9 +26,10 @@ import org.apache.kylin.metadata.project.NProjectManager import org.apache.spark.internal.Logging import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.utils.SchemaProcessor import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.{ArrayType, DataTypes, StructType} +import org.apache.spark.sql.types.{ArrayType, DataTypes} import org.apache.spark.sql.udf.UdfManager import scala.collection.JavaConverters._ @@ -51,16 +52,17 @@ object RuntimeHelper extends Logging { name } - def gtSchemaToCalciteSchema( - primaryKey: ImmutableBitSet, - derivedUtil: SparderDerivedUtil, - factTableName: String, - allColumns: List[TblColRef], - sourceSchema: StructType, - gtColIdx: Array[Int], - tupleIdx: Array[Int], - topNMapping: Map[Int, Column] - ): Seq[Column] = { + // S53 Fix https://olapio.atlassian.net/browse/KE-42473 + def gtSchemaToCalciteSchema(primaryKey: ImmutableBitSet, + derivedUtil: SparderDerivedUtil, + factTableName: String, + allColumns: List[TblColRef], + plan: LogicalPlan, + colIdx: (Array[Int], Array[Int]), + topNMapping: Map[Int, Column]): Seq[Column] = { + val gtColIdx = colIdx._1 + val tupleIdx = colIdx._2 + val sourceSchema = plan.output val gTInfoNames = SchemaProcessor.buildFactTableSortNames(sourceSchema) val calciteToGTinfo = tupleIdx.zipWithIndex.toMap var deriveMap: Map[Int, Column] = Map.empty @@ -98,7 +100,12 @@ object RuntimeHelper extends Logging { val projectConfig = NProjectManager.getProjectConfig(derivedUtil.model.getProject) // may have multi TopN measures. - val topNIndexs = sourceSchema.fields.map(_.dataType).zipWithIndex.filter(_._1.isInstanceOf[ArrayType]) + val topNIndexs = if (plan.resolved) { + sourceSchema.map(_.dataType).zipWithIndex.filter(_._1.isInstanceOf[ArrayType]) + } else { + Seq.empty + } + allColumns.indices .zip(allColumns) .map { diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/util/SparderDerivedUtil.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/util/SparderDerivedUtil.scala index 67834c7d29..a8b38dccb3 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/util/SparderDerivedUtil.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/util/SparderDerivedUtil.scala @@ -18,23 +18,25 @@ package org.apache.kylin.query.util +import java.util + +import org.apache.calcite.sql.SqlKind import org.apache.kylin.guava30.shaded.common.collect.Maps import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate import org.apache.kylin.metadata.cube.model.NDataSegment -import org.apache.kylin.metadata.model.{NDataModel, NTableMetadataManager} -import org.apache.kylin.metadata.model.util.scd2.SCD2NonEquiCondSimplification -import org.apache.calcite.sql.SqlKind import org.apache.kylin.metadata.model.DeriveInfo.DeriveType import org.apache.kylin.metadata.model.NonEquiJoinCondition.SimplifiedNonEquiJoinCondition -import org.apache.kylin.metadata.model.{DeriveInfo, TblColRef} +import org.apache.kylin.metadata.model.util.scd2.SCD2NonEquiCondSimplification +import org.apache.kylin.metadata.model.{DeriveInfo, NDataModel, NTableMetadataManager, TblColRef} import org.apache.kylin.metadata.tuple.TupleInfo +import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.catalyst.plans.logical.{Join, JoinHint, LogicalPlan} import org.apache.spark.sql.derived.DerivedInfo import org.apache.spark.sql.execution.utils.{DeriveTableColumnInfo, SchemaProcessor} import org.apache.spark.sql.functions.col import org.apache.spark.sql.manager.SparderLookupManager -import org.apache.spark.sql.{Column, DataFrame} +import org.apache.spark.sql.{Column, SparkOperation} -import java.util import scala.collection.JavaConverters._ import scala.collection.mutable @@ -62,8 +64,8 @@ case class SparderDerivedUtil(gtInfoTableName: String, .reverse .foreach(entry => findDerivedColumn(entry._1, entry._2)) - val derivedColumnNameMapping: util.HashMap[DerivedInfo, Array[String]] = - Maps.newHashMap[DerivedInfo, Array[String]]() + val derivedColumnNameMapping: util.HashMap[DerivedInfo, Seq[String]] = + Maps.newHashMap[DerivedInfo, Seq[String]]() def findDerivedColumn(hostColIds: util.List[Integer], deriveInfo: DeriveInfo): Unit = { @@ -140,23 +142,23 @@ case class SparderDerivedUtil(gtInfoTableName: String, cuboidIdx } - def joinDerived(dataFrame: DataFrame): DataFrame = { - var joinedDf: DataFrame = dataFrame + def joinDerived(plan: LogicalPlan): LogicalPlan = { + var joinedPlan: LogicalPlan = plan val joinedLookups = scala.collection.mutable.Set[String]() for (hostToDerived <- hostToDeriveds) { if (hostToDerived.deriveType != DeriveType.PK_FK) { // PK_FK derive does not need joining if (!joinedLookups.contains(hostToDerived.aliasTableName)) { - joinedDf = joinLookUpTable(dataFrame.schema.fieldNames, - joinedDf, + joinedPlan = joinLookUpTable(plan.output.map(c => c.name).toArray, + joinedPlan, hostToDerived) joinedLookups.add(hostToDerived.aliasTableName) } } } - joinedDf + joinedPlan } def getLookupTablePathAndPkIndex(deriveInfo: DeriveInfo): (String, String, String, Array[Int]) = { @@ -179,28 +181,25 @@ case class SparderDerivedUtil(gtInfoTableName: String, } def joinLookUpTable(gTInfoNames: Array[String], - df: DataFrame, - derivedInfo: DerivedInfo): DataFrame = { + plan: LogicalPlan, + derivedInfo: DerivedInfo): LogicalPlan = { val lookupTableAlias = derivedInfo.aliasTableName - val lookupDf = + val lookupPlan = SparderLookupManager.getOrCreate(derivedInfo.tableIdentity, derivedInfo.path, dataSeg.getConfig) - val newNames = lookupDf.schema.fieldNames - .map { name => - SchemaProcessor.parseDeriveTableSchemaName(name) - } + val newNames = lookupPlan.output + .map(c => SchemaProcessor.parseDeriveTableSchemaName(c.name)) .sortBy(_.columnId) .map( deriveInfo => DeriveTableColumnInfo(lookupTableAlias, deriveInfo.columnId, deriveInfo.columnName).toString) - .array derivedColumnNameMapping.put(derivedInfo, newNames) - val newNameLookupDf = lookupDf.toDF(newNames: _*) + val newNameLookupPlan = SparkOperation.projectAsAlias(newNames, lookupPlan) if (derivedInfo.fkIdx.length != derivedInfo.pkIdx.length) { throw new IllegalStateException( s"unequal host key num ${derivedInfo.fkIdx.length} " + @@ -224,13 +223,12 @@ case class SparderDerivedUtil(gtInfoTableName: String, val simplifiedCond = SCD2NonEquiCondSimplification.INSTANCE .convertToSimplifiedSCD2Cond(derivedInfo.join) .getSimplifiedNonEquiJoinConditions - joinCol = genNonEquiJoinColumn(newNameLookupDf, gTInfoNames, joinCol, simplifiedCond.asScala) + joinCol = genNonEquiJoinColumn(newNameLookupPlan, gTInfoNames, joinCol, simplifiedCond.asScala) } - df.join(newNameLookupDf, joinCol, derivedInfo.join.getType) - + Join(plan, newNameLookupPlan, JoinType.apply(derivedInfo.join.getType), Option(joinCol.expr), JoinHint.NONE) } - def genNonEquiJoinColumn(newNameLookupDf: DataFrame, + def genNonEquiJoinColumn(newNameLookupPlan: LogicalPlan, gTInfoNames: Array[String], colOrigin: Column, simplifiedConds: mutable.Buffer[SimplifiedNonEquiJoinCondition]): Column = { @@ -238,7 +236,7 @@ case class SparderDerivedUtil(gtInfoTableName: String, var joinCol = colOrigin for (simplifiedCond <- simplifiedConds) { val colFk = col(gTInfoNames.apply(indexOnTheCuboidValues(simplifiedCond.getFk))) - val colPk = col(newNameLookupDf.schema.fieldNames.apply(simplifiedCond.getPk.getColumnDesc.getZeroBasedIndex)) + val colPk = col(newNameLookupPlan.output.apply(simplifiedCond.getPk.getColumnDesc.getZeroBasedIndex).name) val colOp = simplifiedCond.getOp val newCol = colOp match { diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala index db5e9b41ae..8b393c2d63 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala @@ -23,12 +23,15 @@ import java.sql.Timestamp import org.apache.kylin.common.KylinConfig import org.apache.kylin.metadata.cube.model.{LayoutEntity, NDataflow, NDataflowManager} import org.apache.kylin.metadata.model.FusionModelManager +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} import org.apache.spark.sql.datasource.storage.StorageStoreFactory import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.StructType import scala.collection.mutable.{HashMap => MutableHashMap} +import io.kyligence.kap.secondstorage.SecondStorage + class KylinDataFrameManager(sparkSession: SparkSession) { private var extraOptions = new MutableHashMap[String, String]() private var userSpecifiedSchema: Option[StructType] = None @@ -70,7 +73,7 @@ class KylinDataFrameManager(sparkSession: SparkSession) { option("bucketingEnabled", bucketingEnabled) } - def cuboidTable(dataflow: NDataflow, layout: LayoutEntity, pruningInfo: String): DataFrame = { + def cuboidTable(dataflow: NDataflow, layout: LayoutEntity, pruningInfo: String): LogicalPlan = { format("parquet") option("project", dataflow.getProject) option("dataflowId", dataflow.getUuid) @@ -85,18 +88,24 @@ class KylinDataFrameManager(sparkSession: SparkSession) { val partition = dataflow.getModel.getPartitionDesc.getPartitionDateColumnRef val id = layout.getOrderedDimensions.inverse().get(partition) - var df = read(dataflow, layout, pruningInfo) + var plan = read(dataflow, layout, pruningInfo) if (id != null && end != Long.MinValue) { - df = df.filter(col(id.toString).geq(new Timestamp(end))) + val filterPlan = Filter(col(id.toString).geq(new Timestamp(end)).expr, plan) + plan = SparkOperation.project(filterPlan.output.map(c => col(c.name)), filterPlan) } - return df + return plan } read(dataflow, layout, pruningInfo) } - def read(dataflow: NDataflow, layout: LayoutEntity, pruningInfo: String): DataFrame = { + def read(dataflow: NDataflow, layout: LayoutEntity, pruningInfo: String): LogicalPlan = { + val df = SecondStorage.trySecondStorage(sparkSession, dataflow, layout, pruningInfo) + if (df.isEmpty) { StorageStoreFactory.create(dataflow.getModel.getStorageType) .read(dataflow, layout, sparkSession, extraOptions.toMap) + } else { + df.get.queryExecution.analyzed + } } /** diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparkOperation.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparkOperation.scala index 1865f7ab79..1b8cbb9a7d 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparkOperation.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparkOperation.scala @@ -20,8 +20,8 @@ package org.apache.spark.sql import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GroupingSets -import org.apache.spark.sql.catalyst.plans.logical.Aggregate -import org.apache.spark.sql.functions.{count, lit} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LogicalPlan, Project} +import org.apache.spark.sql.functions.{col, count, lit} import org.apache.spark.sql.types.StructType object SparkOperation { @@ -61,27 +61,51 @@ object SparkOperation { SparderEnv.getSparkSession.createDataFrame(rows, structType) } - def agg(aggArgc: AggArgc): DataFrame = { + def project(cols: Seq[Column], plan: LogicalPlan): LogicalPlan = { + Project(cols.map(_.named), plan) + } + + def projectAsAlias(newNames: Seq[String], plan: LogicalPlan): LogicalPlan = { + val newCols = plan.output.zip(newNames).map { case (oldAttribute, newName) => + col(oldAttribute.name).as(newName) + } + project(newCols, plan) + } + + def filter(condition: Column, plan: LogicalPlan): LogicalPlan = { + Filter(condition.named, plan) + } + + def agg(aggArgc: AggArgc): LogicalPlan = { if (aggArgc.agg.nonEmpty && aggArgc.group.nonEmpty && !aggArgc.isSimpleGroup && aggArgc.groupSets.nonEmpty) { - Dataset.ofRows( - aggArgc.dataFrame.sparkSession, - Aggregate( - Seq(GroupingSets(aggArgc.groupSets.map(gs => gs.map(_.expr)), - aggArgc.group.map(_.expr))), - aggArgc.group.map(_.named) ++ aggArgc.agg.map(_.named), - aggArgc.dataFrame.queryExecution.logical - ) + Aggregate( + Seq(GroupingSets(aggArgc.groupSets.map(gs => gs.map(_.expr)), + aggArgc.group.map(_.expr))), + aggArgc.group.map(_.named) ++ aggArgc.agg.map(_.named), + aggArgc.plan ) } else if (aggArgc.agg.nonEmpty && aggArgc.group.nonEmpty) { - aggArgc.dataFrame - .groupBy(aggArgc.group: _*) - .agg(aggArgc.agg.head, aggArgc.agg.drop(1): _*) + Aggregate( + aggArgc.group.map(_.expr), + (aggArgc.group ++ aggArgc.agg).map(_.named), + aggArgc.plan + ) } else if (aggArgc.agg.isEmpty && aggArgc.group.nonEmpty) { - aggArgc.dataFrame.groupBy(aggArgc.group: _*).agg(count(lit("1"))).select(aggArgc.group: _*) + val aggPlan = Aggregate( + aggArgc.group.map(_.expr), + (aggArgc.group ++ Seq.apply(count(lit("1")))).map(_.named), + aggArgc.plan + ) + + Project(aggArgc.group.map(_.named), aggPlan) } else if (aggArgc.agg.nonEmpty && aggArgc.group.isEmpty) { - aggArgc.dataFrame.agg(aggArgc.agg.head, aggArgc.agg.drop(1): _*) + Aggregate( + Nil, + aggArgc.agg.map(_.named), + aggArgc.plan + ) } else { - aggArgc.dataFrame + aggArgc.plan } } @@ -102,4 +126,8 @@ object SparkOperation { */ } -case class AggArgc(dataFrame: DataFrame, group: List[Column], agg: List[Column], groupSets: List[List[Column]], isSimpleGroup: Boolean) \ No newline at end of file +case class AggArgc(plan: LogicalPlan, + group: List[Column], + agg: List[Column], + groupSets: List[List[Column]], + isSimpleGroup: Boolean) \ No newline at end of file diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/execution/utils/SchemaProcessor.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/execution/utils/SchemaProcessor.scala index caec2e369a..161850016c 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/execution/utils/SchemaProcessor.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/execution/utils/SchemaProcessor.scala @@ -17,18 +17,19 @@ */ package org.apache.spark.sql.execution.utils +import java.util + import org.apache.kylin.common.util.ImmutableBitSet import org.apache.kylin.common.{KapConfig, KylinConfig} - -import java.util import org.apache.kylin.metadata.cube.gridtable.NLayoutToGridTableMapping import org.apache.kylin.metadata.cube.model.{LayoutEntity, NDataSegment, NDataflow, NDataflowManager} -import org.apache.kylin.query.runtime.plan.TableScanPlan import org.apache.kylin.metadata.model.{ColumnDesc, FunctionDesc} -import org.apache.spark.sql.{LayoutEntityConverter, SparkSession} +import org.apache.kylin.query.runtime.plan.TableScanPlan +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SparderConstants.COLUMN_NAME_SEPARATOR import org.apache.spark.sql.util.{SparderConstants, SparderTypeUtil} +import org.apache.spark.sql.{LayoutEntityConverter, SparkSession} import scala.collection.JavaConverters._ @@ -37,7 +38,7 @@ object SchemaProcessor { def buildGTSchema(cuboid: LayoutEntity, mapping: NLayoutToGridTableMapping, - tableName: String):Seq[String] = { + tableName: String): Seq[String] = { genColumnNames(tableName, cuboid, mapping) } @@ -79,7 +80,7 @@ object SchemaProcessor { SparderTypeUtil.toSparkType(function.getReturnDataType, true) } case "COUNT" => LongType - case x if x.startsWith("TOP_N") => + case x if x.startsWith("TOP_N") => val fields = function.getParameters.asScala.drop(1).map(p => StructField(s"DIMENSION_${p.getColRef.getName}", SparderTypeUtil.toSparkType(p.getColRef.getType)) ) @@ -97,7 +98,7 @@ object SchemaProcessor { case "COLLECT_SET" => val parameter = function.getParameters.get(0) ArrayType(SparderTypeUtil.toSparkType(parameter.getColRef.getType)) - case _ => SparderTypeUtil.toSparkType(function.getReturnDataType) + case _ => SparderTypeUtil.toSparkType(function.getReturnDataType) } } @@ -113,7 +114,7 @@ object SchemaProcessor { val path: String = TableScanPlan.toLayoutPath(df, nCuboidLayout.getId, base, latestReadySegment) val schema: StructType = sparkSession.read.parquet(path).schema val schemaFromNCuboidLayout: StructType = LayoutEntityConverter.genCuboidSchemaFromNCuboidLayout(nCuboidLayout) - if (!(schema == StructType.removeMetadata("__CHAR_VARCHAR_TYPE_STRING",schemaFromNCuboidLayout))) { + if (!(schema == StructType.removeMetadata("__CHAR_VARCHAR_TYPE_STRING", schemaFromNCuboidLayout))) { throw new RuntimeException(s"Check schema failed : dfName: $dfName, layoutId: ${nCuboidLayout.getId}, actual: ${schemaFromNCuboidLayout.treeString}, expect: ${schema.treeString}") } } @@ -147,12 +148,11 @@ object SchemaProcessor { AggColumnInfo(index, aggFuncName, hash, aggArgs: _*).toString } - def buildFactTableSortNames(sourceSchema: StructType): Array[String] = { - sourceSchema.fieldNames - .filter(name => name.startsWith("F" + COLUMN_NAME_SEPARATOR) || name.startsWith("R" + COLUMN_NAME_SEPARATOR)) - .map(name => (factTableSchemaNameToColumnId(name), name)) + def buildFactTableSortNames(sourceSchema: Seq[Attribute]): Array[String] = { + sourceSchema.filter(att => att.name.startsWith("F" + COLUMN_NAME_SEPARATOR) || att.name.startsWith("R" + COLUMN_NAME_SEPARATOR)) + .map(att => (factTableSchemaNameToColumnId(att.name), att.name)) .sortBy(_._1) - .map(_._2) + .map(_._2).toArray } def buildSchemaWithRawTable(columnDescs: Array[ColumnDesc]): StructType = { @@ -165,8 +165,8 @@ object SchemaProcessor { } def genTopNSchema(advanceTableName: String, - colId: Int, - columnName: String = "N"): String = { + colId: Int, + columnName: String = "N"): String = { TopNColumnInfo(advanceTableName, colId, columnName).toString } @@ -188,14 +188,14 @@ sealed abstract class ColumnInfo(tableName: String, case class FactTableCulumnInfo(tableName: String, columnId: Int, columnName: String) - extends ColumnInfo(tableName, columnId, columnName) { + extends ColumnInfo(tableName, columnId, columnName) { override val prefix: String = "F" } case class DeriveTableColumnInfo(tableName: String, columnId: Int, columnName: String) - extends ColumnInfo(tableName, columnId, columnName) { + extends ColumnInfo(tableName, columnId, columnName) { override val prefix: String = "D" } @@ -204,7 +204,7 @@ case class AggColumnInfo(index: Int, hash: String, args: String*) { override def toString: String = - s"$funcName(${args.mkString("_")})_${index}_$hash" + s"$funcName${args.mkString("_")}_${index}_$hash" } case class TopNColumnInfo(tableName: String, columnId: Int, columnName: String) diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/manager/SparderLookupManager.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/manager/SparderLookupManager.scala index 2ee6551ada..a9e3634cb6 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/manager/SparderLookupManager.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/manager/SparderLookupManager.scala @@ -17,43 +17,28 @@ */ package org.apache.spark.sql.manager -import java.util.concurrent.TimeUnit -import org.apache.kylin.guava30.shaded.common.cache.{Cache, CacheBuilder, RemovalListener, RemovalNotification} +import org.apache.hadoop.fs.Path import org.apache.kylin.common.{KapConfig, KylinConfig} -import org.apache.kylin.metadata.model.NTableMetadataManager -import org.apache.kylin.metadata.model.ColumnDesc +import org.apache.kylin.metadata.model.{ColumnDesc, NTableMetadataManager} +import org.apache.kylin.query.util.PartitionsFilter.{PARTITIONS, PARTITION_COL} import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, LogicalRelation} import org.apache.spark.sql.execution.utils.DeriveTableColumnInfo +import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.{StructField, StructType} -import org.apache.spark.sql.{DataFrame, Dataset, Row, SparderEnv} import org.apache.spark.sql.util.SparderTypeUtil -import org.apache.kylin.query.util.PartitionsFilter.PARTITION_COL -import org.apache.kylin.query.util.PartitionsFilter.PARTITIONS +import org.apache.spark.sql.{SparderEnv, SparkOperation} -import scala.collection.mutable.{ArrayBuffer, ListBuffer} +import scala.collection.mutable.ListBuffer // scalastyle:off object SparderLookupManager extends Logging { - val DEFAULT_MAXSIZE = 100 - val DEFAULT_EXPIRE_TIME = 1 - val DEFAULT_TIME_UNIT = TimeUnit.HOURS - - val sourceCache: Cache[String, Dataset[Row]] = CacheBuilder.newBuilder - .maximumSize(DEFAULT_MAXSIZE) - .expireAfterWrite(DEFAULT_EXPIRE_TIME, DEFAULT_TIME_UNIT) - .removalListener(new RemovalListener[String, Dataset[Row]]() { - override def onRemoval( - notification: RemovalNotification[String, Dataset[Row]]): Unit = { - logInfo("Remove lookup table from spark : " + notification.getKey) - notification.getValue.unpersist() - } - }) - .build - .asInstanceOf[Cache[String, Dataset[Row]]] def create(name: String, sourcePath: String, - kylinConfig: KylinConfig): Dataset[Row] = { + kylinConfig: KylinConfig): LogicalPlan = { val names = name.split("@") val projectName = names.apply(0) val tableName = names.apply(1) @@ -67,7 +52,6 @@ object SparderLookupManager extends Logging { } val dfTableName = Integer.toHexString(System.identityHashCode(name)) - val orderedCol = new ListBuffer[(ColumnDesc, Int)] var partitionCol: (ColumnDesc, Int) = null for ((col, index) <- tableDesc.getColumns.zipWithIndex) { @@ -84,24 +68,31 @@ object SparderLookupManager extends Logging { options.put(PARTITIONS, String.join(",", tableDesc.getSnapshotPartitions.keySet())) options.put("mapreduce.input.pathFilter.class", "org.apache.kylin.query.util.PartitionsFilter") } + val originSchema = StructType(orderedCol.map { case (col, index) => StructField(col.getName, SparderTypeUtil.toSparkType(col.getType)) }) - val schema = StructType(orderedCol.map { case (col, index) => StructField(DeriveTableColumnInfo(dfTableName, index, col.getName).toString, SparderTypeUtil.toSparkType(col.getType)) }) - val resourcePath = KapConfig.getInstanceFromEnv.getReadHdfsWorkingDirectory + sourcePath + val aliasCols = orderedCol.map { + case (c, index) => + col(c.getName).as(DeriveTableColumnInfo(dfTableName, index, c.getName).toString) + } + val resourcePath = new Path(KapConfig.getInstanceFromEnv.getReadHdfsWorkingDirectory + sourcePath) + + val sparkSession = SparderEnv.getSparkSession - SparderEnv.getSparkSession.read.options(options) - .schema(originSchema) - .parquet(resourcePath) - .toDF(schema.fieldNames: _*) + val fileIndex = new InMemoryFileIndex(sparkSession, Seq(resourcePath), options.toMap, Option(originSchema)) + val fsRelation = HadoopFsRelation( + fileIndex, + partitionSchema = fileIndex.partitionSchema, + dataSchema = originSchema, + bucketSpec = None, + new ParquetFileFormat, + options.toMap)(sparkSession) + val plan = LogicalRelation(fsRelation) + SparkOperation.project(aliasCols, plan) } def getOrCreate(name: String, sourcePath: String, - kylinConfig: KylinConfig): DataFrame = { - val value = sourceCache.getIfPresent(sourcePath) - if (value != null) { - value - } else { - create(name, sourcePath, kylinConfig) - } + kylinConfig: KylinConfig): LogicalPlan = { + create(name, sourcePath, kylinConfig) } } diff --git a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/sql/KylinDataFrameManagerTest.java b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/sql/KylinDataFrameManagerTest.java index 52bfa2de28..03a56b08df 100644 --- a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/sql/KylinDataFrameManagerTest.java +++ b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/sql/KylinDataFrameManagerTest.java @@ -18,6 +18,7 @@ package org.apache.kylin.query.sql; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.guava30.shaded.common.collect.ImmutableBiMap; import org.apache.kylin.junit.annotation.MetadataInfo; import org.apache.kylin.metadata.cube.model.LayoutEntity; import org.apache.kylin.metadata.cube.model.NDataflowManager; @@ -26,16 +27,16 @@ import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.model.TimeRange; import org.apache.spark.sql.KylinDataFrameManager; import org.apache.spark.sql.SparkSession; -import org.junit.Assert; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.mockito.Mockito; import org.springframework.util.ReflectionUtils; -import org.apache.kylin.guava30.shaded.common.collect.ImmutableBiMap; - import lombok.val; import lombok.var; +@Disabled("Not for open source") @MetadataInfo(project = "streaming_test") class KylinDataFrameManagerTest { @@ -45,7 +46,7 @@ class KylinDataFrameManagerTest { val config = KylinConfig.getInstanceFromEnv(); val dataflowManager = NDataflowManager.getInstance(config, "streaming_test"); var dataflow = dataflowManager.getDataflow("4965c827-fbb4-4ea1-a744-3f341a3b030d"); - Assert.assertTrue(dataflow.isStreaming() && dataflow.getModel().isFusionModel()); + Assertions.assertTrue(dataflow.isStreaming() && dataflow.getModel().isFusionModel()); val kylinDataFrameManager = Mockito.spy(new KylinDataFrameManager(ss)); kylinDataFrameManager.option("isFastBitmapEnabled", "false"); @@ -56,14 +57,15 @@ class KylinDataFrameManagerTest { ImmutableBiMap.Builder<Integer, TblColRef> dimsBuilder = ImmutableBiMap.builder(); ImmutableBiMap<Integer, TblColRef> orderedDimensions = dimsBuilder.put(1, partitionTblCol).build(); Mockito.when(layoutEntity.getOrderedDimensions()).thenReturn(orderedDimensions); - val df = kylinDataFrameManager.cuboidTable(dataflow, layoutEntity, "3e560d22-b749-48c3-9f64-d4230207f120"); - Assert.assertEquals(1, df.columns().length); + val plan = kylinDataFrameManager.cuboidTable(dataflow, layoutEntity, + "3e560d22-b749-48c3-9f64-d4230207f120"); + Assertions.assertEquals(1, plan.output().size()); } { // condition: id == null - val df = kylinDataFrameManager.cuboidTable(dataflow, new LayoutEntity(), + val plan = kylinDataFrameManager.cuboidTable(dataflow, new LayoutEntity(), "3e560d22-b749-48c3-9f64-d4230207f120"); - Assert.assertEquals(0, df.columns().length); + Assertions.assertEquals(0, plan.output().size()); } { @@ -87,12 +89,13 @@ class KylinDataFrameManagerTest { ReflectionUtils.setField(field, timeRange, Long.MIN_VALUE); seg.setTimeRange(timeRange); } catch (Exception e) { - Assert.fail(e.getMessage()); + Assertions.fail(e.getMessage()); } }); }); - val df = kylinDataFrameManager.cuboidTable(dataflow, layoutEntity, "3e560d22-b749-48c3-9f64-d4230207f120"); - Assert.assertEquals(1, df.columns().length); + val plan = kylinDataFrameManager.cuboidTable(dataflow, layoutEntity, + "3e560d22-b749-48c3-9f64-d4230207f120"); + Assertions.assertEquals(1, plan.output().size()); } ss.stop(); } @@ -103,13 +106,14 @@ class KylinDataFrameManagerTest { val config = KylinConfig.getInstanceFromEnv(); val dataflowManager = NDataflowManager.getInstance(config, "streaming_test"); val dataflow = dataflowManager.getDataflow("cd2b9a23-699c-4699-b0dd-38c9412b3dfd"); - Assert.assertFalse(dataflow.isStreaming()); + Assertions.assertFalse(dataflow.isStreaming()); val kylinDataFrameManager = Mockito.spy(new KylinDataFrameManager(ss)); kylinDataFrameManager.option("isFastBitmapEnabled", "false"); val layoutEntity = new LayoutEntity(); { - val df = kylinDataFrameManager.cuboidTable(dataflow, layoutEntity, "86b5daaa-e295-4e8c-b877-f97bda69bee5"); - Assert.assertEquals(0, df.columns().length); + val plan = kylinDataFrameManager.cuboidTable(dataflow, layoutEntity, + "86b5daaa-e295-4e8c-b877-f97bda69bee5"); + Assertions.assertEquals(0, plan.output().size()); } ss.stop(); } diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/runtime/plan/SegmentEmptyTest.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/runtime/plan/SegmentEmptyTest.scala index 03e0577013..b1ce087474 100644 --- a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/runtime/plan/SegmentEmptyTest.scala +++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/runtime/plan/SegmentEmptyTest.scala @@ -18,33 +18,33 @@ package org.apache.kylin.query.runtime.plan +import java.util + import org.apache.kylin.metadata.cube.model.NDataSegment import org.apache.spark.sql.common.{LocalMetadata, SharedSparkSession, SparderBaseFunSuite} import org.junit.Assert -import java.util - class SegmentEmptyTest extends SparderBaseFunSuite with SharedSparkSession with LocalMetadata { - val prunedSegment1 = null - val prunedSegment2 = new util.LinkedList[NDataSegment] - val prunedSegment3 = new util.LinkedList[NDataSegment] - prunedSegment3.add(new NDataSegment()) + val prunedSegment1 = null + val prunedSegment2 = new util.LinkedList[NDataSegment] + val prunedSegment3 = new util.LinkedList[NDataSegment] + prunedSegment3.add(new NDataSegment()) - val prunedStreamingSegment1 = null - val prunedStreamingSegment2 = new util.LinkedList[NDataSegment] - val prunedStreamingSegment3 = new util.LinkedList[NDataSegment] - prunedStreamingSegment3.add(new NDataSegment()) + val prunedStreamingSegment1 = null + val prunedStreamingSegment2 = new util.LinkedList[NDataSegment] + val prunedStreamingSegment3 = new util.LinkedList[NDataSegment] + prunedStreamingSegment3.add(new NDataSegment()) - Assert.assertTrue(TableScanPlan.isSegmentsEmpty(prunedSegment1, prunedStreamingSegment1)) - Assert.assertTrue(TableScanPlan.isSegmentsEmpty(prunedSegment1, prunedStreamingSegment2)) - Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment1, prunedStreamingSegment3)) + Assert.assertTrue(TableScanPlan.isSegmentsEmpty(prunedSegment1, prunedStreamingSegment1)) + Assert.assertTrue(TableScanPlan.isSegmentsEmpty(prunedSegment1, prunedStreamingSegment2)) + Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment1, prunedStreamingSegment3)) - Assert.assertTrue(TableScanPlan.isSegmentsEmpty(prunedSegment2, prunedStreamingSegment1)) - Assert.assertTrue(TableScanPlan.isSegmentsEmpty(prunedSegment2, prunedStreamingSegment2)) - Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment2, prunedStreamingSegment3)) + Assert.assertTrue(TableScanPlan.isSegmentsEmpty(prunedSegment2, prunedStreamingSegment1)) + Assert.assertTrue(TableScanPlan.isSegmentsEmpty(prunedSegment2, prunedStreamingSegment2)) + Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment2, prunedStreamingSegment3)) - Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment3, prunedStreamingSegment1)) - Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment3, prunedStreamingSegment2)) - Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment3, prunedStreamingSegment3)) + Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment3, prunedStreamingSegment1)) + Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment3, prunedStreamingSegment2)) + Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment3, prunedStreamingSegment3)) } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/sql/SparkInternalAgent.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparkInternalAgent.scala similarity index 100% rename from src/spark-project/engine-spark/src/main/scala/org/apache/spark/sql/SparkInternalAgent.scala rename to src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparkInternalAgent.scala diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/datasource/storage/StorageStore.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/datasource/storage/StorageStore.scala index 4d05368d6e..bd416345f2 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/datasource/storage/StorageStore.scala +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/datasource/storage/StorageStore.scala @@ -18,13 +18,16 @@ package org.apache.spark.sql.datasource.storage +import java.util.concurrent.Executors +import java.util.{Objects, List => JList} +import java.{lang, util} + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.kylin.common.KapConfig import org.apache.kylin.common.util.HadoopUtil import org.apache.kylin.engine.spark.job.NSparkCubingUtil -import org.apache.kylin.engine.spark.utils.StorageUtils.findCountDistinctMeasure -import org.apache.kylin.engine.spark.utils.{Metrics, Repartitioner, StorageUtils} +import org.apache.kylin.engine.spark.utils.{Metrics, StorageUtils} import org.apache.kylin.metadata.cube.model.{LayoutEntity, NDataSegment, NDataflow} import org.apache.spark.internal.Logging import org.apache.spark.sql.LayoutEntityConverter._ @@ -38,21 +41,17 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.{Column, DataFrame, SparkSession} import org.apache.spark.util.ThreadUtils -import java.util.concurrent.Executors -import java.util.{Objects, List => JList} -import java.{lang, util} import scala.collection.JavaConverters._ import scala.concurrent.duration.Duration import scala.concurrent.{ExecutionContext, Future} -case class WriteTaskStats( - numPartitions: Int, - numFiles: Long, - numBytes: Long, - numRows: Long, - sourceRows: Long, - numBucket: Int, - partitionValues: JList[String]) +case class WriteTaskStats(numPartitions: Int, + numFiles: Long, + numBytes: Long, + numRows: Long, + sourceRows: Long, + numBucket: Int, + partitionValues: JList[String]) abstract class StorageStore extends Logging { @@ -62,22 +61,25 @@ abstract class StorageStore extends Logging { def setStorageListener(listener: StorageListener): Unit = storageListener = Some(listener) - def save( - layout: LayoutEntity, - outputPath: Path, - kapConfig: KapConfig, - dataFrame: DataFrame): WriteTaskStats + def save(layout: LayoutEntity, + outputPath: Path, + kapConfig: KapConfig, + dataFrame: DataFrame): WriteTaskStats - def read( - dataflow: NDataflow, layout: LayoutEntity, sparkSession: SparkSession, - extraOptions: Map[String, String] = Map.empty[String, String]): DataFrame + def read(dataflow: NDataflow, + layout: LayoutEntity, + sparkSession: SparkSession, + extraOptions: Map[String, String] = Map.empty[String, String]): LogicalPlan - def readSpecialSegment( - segment: NDataSegment, layout: LayoutEntity, sparkSession: SparkSession, - extraOptions: Map[String, String] = Map.empty[String, String]): DataFrame + def readSpecialSegment(segment: NDataSegment, + layout: LayoutEntity, + sparkSession: SparkSession, + extraOptions: Map[String, String] = Map.empty[String, String]): DataFrame - def readSpecialSegment( - segment: NDataSegment, layout: LayoutEntity, partitionId: java.lang.Long, sparkSession: SparkSession): DataFrame + def readSpecialSegment(segment: NDataSegment, + layout: LayoutEntity, + partitionId: java.lang.Long, + sparkSession: SparkSession): DataFrame def collectFileCountAndSizeAfterSave(outputPath: Path, conf: Configuration): (Long, Long) = { val fs = outputPath.getFileSystem(conf) @@ -125,23 +127,22 @@ class StorageStoreV1 extends StorageStore { LayoutFormatWriter.write(afterReplaced, layoutEntity, outputPath, kapConfig, storageListener) } - override def read(dataflow: NDataflow, layout: LayoutEntity, sparkSession: SparkSession, - extraOptions: Map[String, String] = Map.empty[String, String]): DataFrame = { + extraOptions: Map[String, String] = Map.empty[String, String]): LogicalPlan = { val structType = if ("true".equals(extraOptions.apply("isFastBitmapEnabled"))) { layout.toExactlySchema() } else { layout.toSchema() } val indexCatalog = new FilePruner(sparkSession, options = extraOptions, structType) - sparkSession.baseRelationToDataFrame( - HadoopFsRelation( - indexCatalog, - partitionSchema = indexCatalog.partitionSchema, - dataSchema = indexCatalog.dataSchema.asNullable, - bucketSpec = None, - new ParquetFileFormat, - options = extraOptions)(sparkSession)) + val fsRelation = HadoopFsRelation( + indexCatalog, + partitionSchema = indexCatalog.partitionSchema, + dataSchema = indexCatalog.dataSchema.asNullable, + bucketSpec = None, + new ParquetFileFormat, + options = extraOptions)(sparkSession) + LogicalRelation(fsRelation) } override def readSpecialSegment( diff --git a/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/JobSupport.scala b/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/JobSupport.scala index 5b0d01f6fb..66192a14ca 100644 --- a/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/JobSupport.scala +++ b/src/spark-project/spark-it/src/test/scala/org/apache/kylin/common/JobSupport.scala @@ -47,6 +47,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Suite} import scala.collection.JavaConverters._ + trait JobSupport extends BeforeAndAfterAll with BeforeAndAfterEach @@ -147,9 +148,9 @@ trait JobSupport @throws[Exception] protected def buildSegment(cubeName: String, - segmentRange: SegmentRange[_ <: Comparable[_]], - toBuildLayouts: java.util.Set[LayoutEntity], - prj: String): NDataSegment = { + segmentRange: SegmentRange[_ <: Comparable[_]], + toBuildLayouts: java.util.Set[LayoutEntity], + prj: String): NDataSegment = { val config: KylinConfig = KylinConfig.getInstanceFromEnv val dsMgr: NDataflowManager = NDataflowManager.getInstance(config, prj) val execMgr: NExecutableManager =