This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5_beta in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 0b0471b22ef3f627d9affc4c7887da65577d40eb Author: Pengfei Zhan <dethr...@gmail.com> AuthorDate: Sat Apr 15 15:27:32 2023 +0800 KYLIN-5649 prefer AggIndex works when computed column appears in the index rewrite the projects of OlapProjectRel --- .../NAggIndexPriorityAnswerWithCCExprTest.java | 102 +++++++++++++ .../java/org/apache/kylin/util/ExecAndComp.java | 24 +-- .../aggindex_priority_answer_withccexpr.json | 18 +++ .../8dc395fb-f201-310e-feaa-d4000a7e16cb.json | 11 ++ .../8dc395fb-f201-310e-feaa-d4000a7e16cb.json | 85 +++++++++++ .../8dc395fb-f201-310e-feaa-d4000a7e16cb.json | 161 +++++++++++++++++++++ .../table/SSB.DATES.json | 113 +++++++++++++++ .../apache/kylin/query/relnode/KapProjectRel.java | 57 +++++++- .../kylin/query/relnode/OLAPAggregateRel.java | 9 +- .../apache/kylin/query/relnode/OLAPProjectRel.java | 4 +- .../org/apache/kylin/query/routing/Candidate.java | 6 + .../kylin/query/routing/RealizationChooser.java | 13 +- .../routing/RemoveIncapableRealizationsRule.java | 31 ++-- .../kylin/query/routing/CandidateSortTest.java | 6 +- .../kyligence/kap/secondstorage/tdvt/TDVTTest.java | 21 ++- .../kylin/query/pushdown/SparkSqlClient.scala | 3 +- .../org/apache/spark/sql/SparderTypeUtil.scala | 7 +- .../sql/execution/datasource/LayoutFileIndex.scala | 109 -------------- 18 files changed, 618 insertions(+), 162 deletions(-) diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/NAggIndexPriorityAnswerWithCCExprTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/NAggIndexPriorityAnswerWithCCExprTest.java new file mode 100644 index 0000000000..e0ab0884f3 --- /dev/null +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/NAggIndexPriorityAnswerWithCCExprTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.newten; + +import java.util.List; +import java.util.Map; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest; +import org.apache.kylin.guava30.shaded.common.collect.Sets; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.impl.threadpool.NDefaultScheduler; +import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate; +import org.apache.kylin.metadata.cube.model.LayoutEntity; +import org.apache.kylin.metadata.cube.model.NDataflow; +import org.apache.kylin.metadata.cube.model.NDataflowManager; +import org.apache.kylin.metadata.cube.model.NIndexPlanManager; +import org.apache.kylin.metadata.model.SegmentRange; +import org.apache.kylin.query.relnode.OLAPContext; +import org.apache.kylin.query.routing.Candidate; +import org.apache.kylin.query.routing.QueryLayoutChooser; +import org.apache.kylin.query.routing.RemoveIncapableRealizationsRule; +import org.apache.kylin.util.OlapContextTestUtil; +import org.apache.spark.sql.SparderEnv; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class NAggIndexPriorityAnswerWithCCExprTest extends NLocalWithSparkSessionTest { + + @Override + public String getProject() { + return "aggindex_priority_answer_withccexpr"; + } + + @Before + public void setup() throws Exception { + overwriteSystemProp("kylin.query.use-tableindex-answer-non-raw-query", "true"); + overwriteSystemProp("kylin.query.layout.prefer-aggindex", "true"); + this.createTestMetadata("src/test/resources/ut_meta/aggindex_priority_answer_withccexpr"); + + NDefaultScheduler scheduler = NDefaultScheduler.getInstance(getProject()); + scheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv())); + if (!scheduler.hasStarted()) { + throw new RuntimeException("scheduler has not been started"); + } + } + + @After + public void after() throws Exception { + NDefaultScheduler.destroyInstance(); + cleanupTestMetadata(); + } + + @Test + public void testAggIndexPriorityAnswerWithCcExpr() throws Exception { + String modelId = "8dc395fb-f201-310e-feaa-d4000a7e16cb"; + NIndexPlanManager indexMgr = NIndexPlanManager.getInstance(getTestConfig(), getProject()); + List<LayoutEntity> layouts = indexMgr.getIndexPlan(modelId).getAllLayouts(); + indexDataConstructor.buildIndex(modelId, SegmentRange.TimePartitionedSegmentRange.createInfinite(), + Sets.newLinkedHashSet(layouts), true); + populateSSWithCSVData(getTestConfig(), getProject(), SparderEnv.getSparkSession()); + + String sql = "select D_YEAR,count(CASE WHEN b IN ('1') THEN 1 ELSE NULL END) " + + "from (select D_YEAR,D_DAYOFWEEK b from SSB.DATES) group by D_YEAR"; + + NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), getProject()).getDataflow(modelId); + OLAPContext context = OlapContextTestUtil.getOlapContexts(getProject(), sql).get(0); + + Map<String, String> sqlAlias2ModelName = OlapContextTestUtil.matchJoins(dataflow.getModel(), context); + context.fixModel(dataflow.getModel(), sqlAlias2ModelName); + NLayoutCandidate layoutCandidate = QueryLayoutChooser.selectLayoutCandidate(dataflow, + dataflow.getQueryableSegments(), context.getSQLDigest(), null); + assert layoutCandidate != null; + Assert.assertTrue(layoutCandidate.getCapabilityResult().isCapable()); + + RemoveIncapableRealizationsRule removeIncapableRealizationsRule = new RemoveIncapableRealizationsRule(); + Candidate candidate = new Candidate(dataflow.getRealizations().get(0), context, sqlAlias2ModelName); + candidate.setPrunedSegments(dataflow.getQueryableSegments(), dataflow); + removeIncapableRealizationsRule.apply(candidate); + Assert.assertTrue(candidate.getCapability().isCapable()); + NLayoutCandidate selectedCandidate = (NLayoutCandidate) candidate.getCapability().getSelectedCandidate(); + Assert.assertNotNull(selectedCandidate.getLayoutEntity().getIndex().getLayout(50001)); + } +} diff --git a/src/kylin-it/src/test/java/org/apache/kylin/util/ExecAndComp.java b/src/kylin-it/src/test/java/org/apache/kylin/util/ExecAndComp.java index c46b14a438..833c481d02 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/util/ExecAndComp.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/util/ExecAndComp.java @@ -40,10 +40,14 @@ import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.Unsafe; +import org.apache.kylin.guava30.shaded.common.base.Preconditions; +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Sets; import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.query.StructField; import org.apache.kylin.query.engine.QueryExec; import org.apache.kylin.query.engine.data.QueryResult; +import org.apache.kylin.query.pushdown.SparkSqlClient; import org.apache.kylin.query.relnode.OLAPContext; import org.apache.kylin.query.util.PushDownUtil; import org.apache.kylin.query.util.QueryParams; @@ -54,9 +58,6 @@ import org.apache.spark.sql.SparderEnv; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.SparderTypeUtil; -import org.apache.kylin.guava30.shaded.common.base.Preconditions; -import org.apache.kylin.guava30.shaded.common.collect.Lists; -import org.apache.kylin.guava30.shaded.common.collect.Sets; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -190,22 +191,11 @@ public class ExecAndComp { } catch (Exception e) { log.warn("persist {} failed", sqlPath, e); } - val rows = ds.collectAsList(); val structs = Arrays.stream(ds.schema().fields()).map(SparderTypeUtil::convertSparkFieldToJavaField) .collect(Collectors.toList()); - return new QueryResult(rows.stream().map(r -> { - List<String> result = Lists.newArrayList(); - for (int i = 0; i < r.size(); i++) { - val structField = structs.get(i); - val node = r.get(i); - if (node == null) { - result.add(null); - } else { - result.add(node.toString()); - } - } - return result; - }).collect(Collectors.toList()), rows.size(), structs); + val dsIter = ds.toIterator(); + Iterable<List<String>> listIter = SparkSqlClient.readPushDownResultRow(dsIter._1(), false); + return new QueryResult(Lists.newArrayList(listIter), (int) dsIter._2(), structs); } public static String removeDataBaseInSql(String originSql) { diff --git a/src/kylin-it/src/test/resources/ut_meta/aggindex_priority_answer_withccexpr/metadata/_global/project/aggindex_priority_answer_withccexpr.json b/src/kylin-it/src/test/resources/ut_meta/aggindex_priority_answer_withccexpr/metadata/_global/project/aggindex_priority_answer_withccexpr.json new file mode 100644 index 0000000000..af27affb22 --- /dev/null +++ b/src/kylin-it/src/test/resources/ut_meta/aggindex_priority_answer_withccexpr/metadata/_global/project/aggindex_priority_answer_withccexpr.json @@ -0,0 +1,18 @@ +{ + "uuid" : "78d326e5-e194-aa38-781c-f328226a8952", + "last_modified" : 1676345088349, + "create_time" : 1673592427751, + "version" : "4.0.0.0", + "name" : "project1", + "owner" : "ADMIN", + "status" : "ENABLED", + "create_time_utc" : 1673592427751, + "default_database" : "DEFAULT", + "description" : "", + "principal" : null, + "keytab" : null, + "maintain_model_type" : "MANUAL_MAINTAIN", + "override_kylin_properties" : { + "kylin.source.default" : "9" + } +} \ No newline at end of file diff --git a/src/kylin-it/src/test/resources/ut_meta/aggindex_priority_answer_withccexpr/metadata/aggindex_priority_answer_withccexpr/dataflow/8dc395fb-f201-310e-feaa-d4000a7e16cb.json b/src/kylin-it/src/test/resources/ut_meta/aggindex_priority_answer_withccexpr/metadata/aggindex_priority_answer_withccexpr/dataflow/8dc395fb-f201-310e-feaa-d4000a7e16cb.json new file mode 100644 index 0000000000..2dad7ae769 --- /dev/null +++ b/src/kylin-it/src/test/resources/ut_meta/aggindex_priority_answer_withccexpr/metadata/aggindex_priority_answer_withccexpr/dataflow/8dc395fb-f201-310e-feaa-d4000a7e16cb.json @@ -0,0 +1,11 @@ +{ + "uuid" : "8dc395fb-f201-310e-feaa-d4000a7e16cb", + "last_modified" : 1678518305320, + "create_time" : 1678511175972, + "version" : "4.0.0.0", + "status" : "ONLINE", + "last_status" : null, + "cost" : 50, + "query_hit_count" : 5, + "last_query_time" : 1678517094673 +} \ No newline at end of file diff --git a/src/kylin-it/src/test/resources/ut_meta/aggindex_priority_answer_withccexpr/metadata/aggindex_priority_answer_withccexpr/index_plan/8dc395fb-f201-310e-feaa-d4000a7e16cb.json b/src/kylin-it/src/test/resources/ut_meta/aggindex_priority_answer_withccexpr/metadata/aggindex_priority_answer_withccexpr/index_plan/8dc395fb-f201-310e-feaa-d4000a7e16cb.json new file mode 100644 index 0000000000..e603f253e0 --- /dev/null +++ b/src/kylin-it/src/test/resources/ut_meta/aggindex_priority_answer_withccexpr/metadata/aggindex_priority_answer_withccexpr/index_plan/8dc395fb-f201-310e-feaa-d4000a7e16cb.json @@ -0,0 +1,85 @@ +{ + "uuid" : "8dc395fb-f201-310e-feaa-d4000a7e16cb", + "last_modified" : 1678512309327, + "create_time" : 1676951355356, + "version" : "4.0.0.0", + "description" : null, + "rule_based_index" : { + "dimensions" : [ 10 ], + "measures" : [ 100000, 100001 ], + "global_dim_cap" : null, + "aggregation_groups" : [ { + "includes" : [ 10 ], + "measures" : [ 100000, 100001 ], + "select_rule" : { + "hierarchy_dims" : [ ], + "mandatory_dims" : [ 10 ], + "joint_dims" : [ ] + }, + "index_range" : "EMPTY" + } ], + "layout_id_mapping" : [ 50001 ], + "parent_forward" : 3, + "index_start_id" : 50000, + "last_modify_time" : 1678512309324, + "layout_black_list" : [ ], + "scheduler_version" : 2, + "index_update_enabled" : true, + "base_layout_enabled" : true + }, + "indexes" : [ { + "id" : 20000010000, + "dimensions" : [ 10, 3 ], + "measures" : [ ], + "layouts" : [ { + "id" : 20000010001, + "name" : null, + "owner" : "ADMIN", + "col_order" : [ 10, 3 ], + "shard_by_columns" : [ ], + "partition_by_columns" : [ ], + "sort_by_columns" : [ ], + "storage_type" : 20, + "update_time" : 1676958110904, + "manual" : true, + "auto" : false, + "base" : false, + "draft_version" : null, + "index_range" : "EMPTY" + } ], + "next_layout_offset" : 2 + }, { + "id" : 20000020000, + "dimensions" : [ 3, 10, 17 ], + "measures" : [ ], + "layouts" : [ { + "id" : 20000020001, + "name" : null, + "owner" : "ADMIN", + "col_order" : [ 3, 10, 17 ], + "shard_by_columns" : [ ], + "partition_by_columns" : [ ], + "sort_by_columns" : [ ], + "storage_type" : 20, + "update_time" : 1677123358096, + "manual" : false, + "auto" : false, + "base" : true, + "draft_version" : null, + "index_range" : null + } ], + "next_layout_offset" : 2 + } ], + "override_properties" : { }, + "to_be_deleted_indexes" : [ ], + "auto_merge_time_ranges" : null, + "retention_range" : 0, + "engine_type" : 80, + "next_aggregation_index_id" : 60000, + "next_table_index_id" : 20000030000, + "agg_shard_by_columns" : [ ], + "extend_partition_columns" : [ ], + "layout_bucket_num" : { }, + "approved_additional_recs" : 0, + "approved_removal_recs" : 0 +} \ No newline at end of file diff --git a/src/kylin-it/src/test/resources/ut_meta/aggindex_priority_answer_withccexpr/metadata/aggindex_priority_answer_withccexpr/model_desc/8dc395fb-f201-310e-feaa-d4000a7e16cb.json b/src/kylin-it/src/test/resources/ut_meta/aggindex_priority_answer_withccexpr/metadata/aggindex_priority_answer_withccexpr/model_desc/8dc395fb-f201-310e-feaa-d4000a7e16cb.json new file mode 100644 index 0000000000..948c3e6d3f --- /dev/null +++ b/src/kylin-it/src/test/resources/ut_meta/aggindex_priority_answer_withccexpr/metadata/aggindex_priority_answer_withccexpr/model_desc/8dc395fb-f201-310e-feaa-d4000a7e16cb.json @@ -0,0 +1,161 @@ +{ + "uuid" : "8dc395fb-f201-310e-feaa-d4000a7e16cb", + "last_modified" : 1678511175928, + "create_time" : 1676951354863, + "version" : "4.0.0.0", + "alias" : "aggindex_priority_answer_withccexpr", + "owner" : "ADMIN", + "config_last_modifier" : null, + "config_last_modified" : 0, + "description" : "", + "fact_table" : "SSB.DATES", + "fact_table_alias" : null, + "management_type" : "MODEL_BASED", + "join_tables" : [ ], + "filter_condition" : "", + "partition_desc" : null, + "capacity" : "MEDIUM", + "segment_config" : { + "auto_merge_enabled" : null, + "auto_merge_time_ranges" : null, + "volatile_range" : null, + "retention_range" : null, + "create_empty_segment_enabled" : false + }, + "data_check_desc" : null, + "semantic_version" : 0, + "storage_type" : 0, + "model_type" : "BATCH", + "all_named_columns" : [ { + "id" : 0, + "name" : "D_WEEKNUMINYEAR", + "column" : "DATES.D_WEEKNUMINYEAR" + }, { + "id" : 1, + "name" : "D_LASTDAYINWEEKFL", + "column" : "DATES.D_LASTDAYINWEEKFL" + }, { + "id" : 2, + "name" : "D_LASTDAYINMONTHFL", + "column" : "DATES.D_LASTDAYINMONTHFL" + }, { + "id" : 3, + "name" : "D_DAYOFWEEK", + "column" : "DATES.D_DAYOFWEEK", + "status" : "DIMENSION" + }, { + "id" : 4, + "name" : "D_MONTHNUMINYEAR", + "column" : "DATES.D_MONTHNUMINYEAR" + }, { + "id" : 5, + "name" : "D_YEARMONTHNUM", + "column" : "DATES.D_YEARMONTHNUM" + }, { + "id" : 6, + "name" : "D_YEARMONTH", + "column" : "DATES.D_YEARMONTH" + }, { + "id" : 7, + "name" : "D_DAYNUMINMONTH", + "column" : "DATES.D_DAYNUMINMONTH" + }, { + "id" : 8, + "name" : "D_SELLINGSEASON", + "column" : "DATES.D_SELLINGSEASON" + }, { + "id" : 9, + "name" : "D_WEEKDAYFL", + "column" : "DATES.D_WEEKDAYFL" + }, { + "id" : 10, + "name" : "D_YEAR", + "column" : "DATES.D_YEAR", + "status" : "DIMENSION" + }, { + "id" : 11, + "name" : "D_HOLIDAYFL", + "column" : "DATES.D_HOLIDAYFL" + }, { + "id" : 12, + "name" : "D_DAYNUMINWEEK", + "column" : "DATES.D_DAYNUMINWEEK" + }, { + "id" : 13, + "name" : "D_DAYNUMINYEAR", + "column" : "DATES.D_DAYNUMINYEAR" + }, { + "id" : 14, + "name" : "D_DATE", + "column" : "DATES.D_DATE" + }, { + "id" : 15, + "name" : "D_MONTH", + "column" : "DATES.D_MONTH" + }, { + "id" : 16, + "name" : "D_DATEKEY", + "column" : "DATES.D_DATEKEY" + }, { + "id" : 17, + "name" : "CC_1", + "column" : "DATES.CC_1", + "status" : "DIMENSION" + } ], + "all_measures" : [ { + "name" : "COUNT_ALL", + "function" : { + "expression" : "COUNT", + "parameters" : [ { + "type" : "constant", + "value" : "1" + } ], + "returntype" : "bigint" + }, + "column" : null, + "comment" : null, + "id" : 100000, + "type" : "NORMAL", + "internal_ids" : [ ] + }, { + "name" : "COUNT(1)", + "function" : { + "expression" : "COUNT", + "parameters" : [ { + "type" : "column", + "value" : "DATES.CC_1" + } ], + "returntype" : "bigint" + }, + "column" : null, + "comment" : "", + "id" : 100001, + "type" : "NORMAL", + "internal_ids" : [ ] + } ], + "recommendations_count" : 0, + "computed_columns" : [ { + "tableIdentity" : "SSB.DATES", + "tableAlias" : "DATES", + "columnName" : "CC_1", + "expression" : "CASE WHEN DATES.D_DAYOFWEEK IN ('1') THEN 1 ELSE NULL END", + "innerExpression" : "CASE WHEN `DATES`.`D_DAYOFWEEK` IN ('1') THEN 1 ELSE NULL END", + "datatype" : "INTEGER", + "comment" : null, + "rec_uuid" : null + } ], + "canvas" : { + "coordinate" : { + "DATES" : { + "x" : 691.3332790798611, + "y" : 155.33331976996527, + "width" : 200.0, + "height" : 230.0 + } + }, + "zoom" : 9.0 + }, + "multi_partition_desc" : null, + "multi_partition_key_mapping" : null, + "fusion_id" : null +} \ No newline at end of file diff --git a/src/kylin-it/src/test/resources/ut_meta/aggindex_priority_answer_withccexpr/metadata/aggindex_priority_answer_withccexpr/table/SSB.DATES.json b/src/kylin-it/src/test/resources/ut_meta/aggindex_priority_answer_withccexpr/metadata/aggindex_priority_answer_withccexpr/table/SSB.DATES.json new file mode 100644 index 0000000000..21a1977a13 --- /dev/null +++ b/src/kylin-it/src/test/resources/ut_meta/aggindex_priority_answer_withccexpr/metadata/aggindex_priority_answer_withccexpr/table/SSB.DATES.json @@ -0,0 +1,113 @@ +{ + "uuid" : "9a601ae5-9491-b29f-19f5-36c0b5aee0c8", + "last_modified" : 0, + "create_time" : 1676879675874, + "version" : "4.0.0.0", + "name" : "DATES", + "columns" : [ { + "id" : "1", + "name" : "D_DATEKEY", + "datatype" : "date", + "case_sensitive_name" : "d_datekey" + }, { + "id" : "2", + "name" : "D_DATE", + "datatype" : "varchar(4096)", + "case_sensitive_name" : "d_date" + }, { + "id" : "3", + "name" : "D_DAYOFWEEK", + "datatype" : "varchar(4096)", + "case_sensitive_name" : "d_dayofweek" + }, { + "id" : "4", + "name" : "D_MONTH", + "datatype" : "varchar(4096)", + "case_sensitive_name" : "d_month" + }, { + "id" : "5", + "name" : "D_YEAR", + "datatype" : "integer", + "case_sensitive_name" : "d_year" + }, { + "id" : "6", + "name" : "D_YEARMONTHNUM", + "datatype" : "integer", + "case_sensitive_name" : "d_yearmonthnum" + }, { + "id" : "7", + "name" : "D_YEARMONTH", + "datatype" : "varchar(4096)", + "case_sensitive_name" : "d_yearmonth" + }, { + "id" : "8", + "name" : "D_DAYNUMINWEEK", + "datatype" : "integer", + "case_sensitive_name" : "d_daynuminweek" + }, { + "id" : "9", + "name" : "D_DAYNUMINMONTH", + "datatype" : "integer", + "case_sensitive_name" : "d_daynuminmonth" + }, { + "id" : "10", + "name" : "D_DAYNUMINYEAR", + "datatype" : "integer", + "case_sensitive_name" : "d_daynuminyear" + }, { + "id" : "11", + "name" : "D_MONTHNUMINYEAR", + "datatype" : "integer", + "case_sensitive_name" : "d_monthnuminyear" + }, { + "id" : "12", + "name" : "D_WEEKNUMINYEAR", + "datatype" : "integer", + "case_sensitive_name" : "d_weeknuminyear" + }, { + "id" : "13", + "name" : "D_SELLINGSEASON", + "datatype" : "varchar(4096)", + "case_sensitive_name" : "d_sellingseason" + }, { + "id" : "14", + "name" : "D_LASTDAYINWEEKFL", + "datatype" : "integer", + "case_sensitive_name" : "d_lastdayinweekfl" + }, { + "id" : "15", + "name" : "D_LASTDAYINMONTHFL", + "datatype" : "integer", + "case_sensitive_name" : "d_lastdayinmonthfl" + }, { + "id" : "16", + "name" : "D_HOLIDAYFL", + "datatype" : "integer", + "case_sensitive_name" : "d_holidayfl" + }, { + "id" : "17", + "name" : "D_WEEKDAYFL", + "datatype" : "integer", + "case_sensitive_name" : "d_weekdayfl" + } ], + "source_type" : 9, + "table_type" : "EXTERNAL", + "top" : false, + "increment_loading" : false, + "last_snapshot_path" : null, + "last_snapshot_size" : 0, + "snapshot_last_modified" : 0, + "query_hit_count" : 0, + "partition_column" : null, + "snapshot_partitions" : { }, + "snapshot_partitions_info" : { }, + "snapshot_total_rows" : 0, + "snapshot_partition_col" : null, + "selected_snapshot_partition_col" : null, + "temp_snapshot_path" : null, + "snapshot_has_broken" : false, + "database" : "SSB", + "transactional" : false, + "rangePartition" : false, + "partition_desc" : null +} \ No newline at end of file diff --git a/src/query-common/src/main/java/org/apache/kylin/query/relnode/KapProjectRel.java b/src/query-common/src/main/java/org/apache/kylin/query/relnode/KapProjectRel.java index 697a2d0dd9..7b45aec465 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/relnode/KapProjectRel.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/relnode/KapProjectRel.java @@ -20,6 +20,7 @@ package org.apache.kylin.query.relnode; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -39,13 +40,12 @@ import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rel.type.RelDataTypeFieldImpl; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.query.schema.OLAPTable; -import org.apache.kylin.query.util.ICutContextStrategy; - import org.apache.kylin.guava30.shaded.common.collect.Lists; import org.apache.kylin.guava30.shaded.common.collect.Maps; import org.apache.kylin.guava30.shaded.common.collect.Sets; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.query.schema.OLAPTable; +import org.apache.kylin.query.util.ICutContextStrategy; import lombok.Setter; import lombok.val; @@ -162,6 +162,7 @@ public class KapProjectRel extends OLAPProjectRel implements KapRel { || !(this.context.hasPrecalculatedFields()) || (this.getContext().isHasJoin() && this.beforeTopPreCalcJoin)) { this.columnRowType = this.buildColumnRowType(); + this.rewriteProjects(); return; } @@ -189,9 +190,57 @@ public class KapProjectRel extends OLAPProjectRel implements KapRel { // rebuild columns this.columnRowType = this.buildColumnRowType(); + this.rewriteProjects(); this.rewriting = false; } + private void rewriteProjects() { + OLAPRel olapChild = (OLAPRel) getInput(); + ColumnRowType inputColumnRowType = olapChild.getColumnRowType(); + List<TblColRef> allColumns = inputColumnRowType.getAllColumns(); + List<TblColRef> ccColRefList = allColumns.stream() // + .filter(col -> col.getColumnDesc().isComputedColumn()) // + .collect(Collectors.toList()); + + Map<TblColRef, Integer> columnToIdMap = Maps.newHashMap(); + for (int i = 0; i < allColumns.size(); i++) { + TblColRef colRef = allColumns.get(i); + if (TblColRef.UNKNOWN_ALIAS.equalsIgnoreCase(colRef.getTableAlias())) { + continue; + } else if (columnToIdMap.containsKey(colRef)) { + logger.warn("duplicate TblColRef {} of computed column.", colRef); + } + columnToIdMap.putIfAbsent(colRef, i); + } + List<RexNode> newRewriteProjList = Lists.newArrayList(); + Map<String, TblColRef> map = Maps.newHashMap(); + for (TblColRef tblColRef : ccColRefList) { + map.putIfAbsent(tblColRef.getDoubleQuoteExp(), tblColRef); + } + Map<RexNode, TblColRef> nodeAndTblColMap = new HashMap<>(); + for (int i = 0; i < this.rewriteProjects.size(); i++) { + RexNode rex = this.rewriteProjects.get(i); + RelDataTypeField columnField = this.rowType.getFieldList().get(i); + String fieldName = columnField.getName(); + Set<TblColRef> sourceCollector = Sets.newHashSet(); + TblColRef column = translateRexNode(rex, inputColumnRowType, fieldName, sourceCollector, nodeAndTblColMap); + if (column == null) + throw new IllegalStateException("No TblColRef found in " + rex); + TblColRef existColRef = map.get(column.toString()); + if (existColRef != null && getContext().allColumns.contains(existColRef)) { + column = existColRef; + List<RelDataTypeField> inputFieldList = getInput().getRowType().getFieldList(); + RelDataTypeField inputField = inputFieldList.get(columnToIdMap.get(column)); + RexNode newRef = inputField == null ? rex + : new RexInputRef(inputField.getIndex(), inputField.getType()); + newRewriteProjList.add(newRef); + } else { + newRewriteProjList.add(rex); + } + } + this.rewriteProjects = newRewriteProjList; + } + private void updateSubContexts(Set<OLAPContext> subContexts) { if (isMerelyPermutation || this.rewriting || this.afterAggregate) return; diff --git a/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java index ac36d4c936..5ba455c3ba 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java @@ -58,6 +58,9 @@ import org.apache.calcite.sql.type.SqlTypeUtil; import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction; import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.Util; +import org.apache.kylin.guava30.shaded.common.base.Preconditions; +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Sets; import org.apache.kylin.measure.MeasureTypeFactory; import org.apache.kylin.measure.basic.BasicMeasureType; import org.apache.kylin.measure.percentile.PercentileMeasureType; @@ -68,10 +71,6 @@ import org.apache.kylin.metadata.model.ParameterDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.query.schema.OLAPTable; -import org.apache.kylin.guava30.shaded.common.base.Preconditions; -import org.apache.kylin.guava30.shaded.common.collect.Lists; -import org.apache.kylin.guava30.shaded.common.collect.Sets; - /** */ public class OLAPAggregateRel extends Aggregate implements OLAPRel { @@ -315,7 +314,7 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel { } public boolean needRewrite() { - return this.context.realization != null && !this.afterAggregate && !context.isAnsweredByTableIndex(); + return this.context.realization != null && !this.afterAggregate; } @Override diff --git a/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPProjectRel.java b/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPProjectRel.java index 74ab0a38bb..ed6a693519 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPProjectRel.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/relnode/OLAPProjectRel.java @@ -91,9 +91,9 @@ public class OLAPProjectRel extends Project implements OLAPRel { } /** - * Since the project under aggregate maybe reduce expressions by {@link org.apache.kylin.query.optrule.AggregateProjectReduceRule}, + * Since the project under aggregate maybe reduce expressions by AggregateProjectReduceRule, * consider the count of expressions into cost, the reduced project will be used. - * + * <p> * Made RexOver much more expensive so we can transform into {@link OLAPWindowRel} * by rules in {@link org.apache.calcite.rel.rules.ProjectToWindowRule} */ diff --git a/src/query-common/src/main/java/org/apache/kylin/query/routing/Candidate.java b/src/query-common/src/main/java/org/apache/kylin/query/routing/Candidate.java index 462dbe9071..47813efe9a 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/routing/Candidate.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/routing/Candidate.java @@ -100,6 +100,12 @@ public class Candidate { this.realization = realization; this.ctx = ctx; this.matchedJoinsGraphAliasMap = matchedJoinsGraphAliasMap; + // Initialize OlapContext's rewritten properties to avoid NPE errors + this.recordRewrittenCtxProps(); + } + + void recordRewrittenCtxProps() { + this.rewrittenCtx = RealizationChooser.preservePropsBeforeRewrite(ctx); } @Override diff --git a/src/query-common/src/main/java/org/apache/kylin/query/routing/RealizationChooser.java b/src/query-common/src/main/java/org/apache/kylin/query/routing/RealizationChooser.java index 0613f62d72..eb82e60b35 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/routing/RealizationChooser.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/routing/RealizationChooser.java @@ -350,16 +350,11 @@ public class RealizationChooser { return null; } - candidate.setRewrittenCtx(preserveRewriteProps(olapContext)); logger.info("The realizations remaining: {}, and the final chosen one for current olap context {} is {}", candidate.realization.getCanonicalName(), olapContext.id, candidate.realization.getCanonicalName()); return candidate; } - static OLAPContextProp preserveRewriteProps(OLAPContext rewrittenOLAContext) { - return preservePropsBeforeRewrite(rewrittenOLAContext); - } - static OLAPContextProp preservePropsBeforeRewrite(OLAPContext oriOLAPContext) { OLAPContextProp preserved = new OLAPContextProp(-1); preserved.allColumns = Sets.newHashSet(oriOLAPContext.allColumns); @@ -378,9 +373,13 @@ public class RealizationChooser { static void restoreOLAPContextProps(OLAPContext oriOLAPContext, OLAPContextProp preservedOLAPContext) { oriOLAPContext.allColumns = preservedOLAPContext.allColumns; oriOLAPContext.setSortColumns(preservedOLAPContext.getSortColumns()); + // By creating a new hashMap, the containsKey method can obtain appropriate results. + // This is necessary because the aggregations have changed during the query matching process, + // therefore changed hash of the same object would be put into different bucket of the LinkedHashMap. + Map<FunctionDesc, FunctionDesc> map = Maps.newHashMap(preservedOLAPContext.getReservedMap()); oriOLAPContext.aggregations.forEach(agg -> { - if (preservedOLAPContext.getReservedMap().containsKey(agg)) { - final FunctionDesc functionDesc = preservedOLAPContext.getReservedMap().get(agg); + if (map.containsKey(agg)) { + final FunctionDesc functionDesc = map.get(agg); agg.setExpression(functionDesc.getExpression()); agg.setParameters(functionDesc.getParameters()); agg.setReturnType(functionDesc.getReturnType()); diff --git a/src/query-common/src/main/java/org/apache/kylin/query/routing/RemoveIncapableRealizationsRule.java b/src/query-common/src/main/java/org/apache/kylin/query/routing/RemoveIncapableRealizationsRule.java index 188d53e567..95c3da66fa 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/routing/RemoveIncapableRealizationsRule.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/routing/RemoveIncapableRealizationsRule.java @@ -25,6 +25,7 @@ import org.apache.kylin.metadata.realization.CapabilityResult; import org.apache.kylin.metadata.realization.HybridRealization; import org.apache.kylin.metadata.realization.IRealization; import org.apache.kylin.metadata.realization.SQLDigest; +import org.apache.kylin.query.relnode.OLAPContextProp; import org.apache.kylin.query.util.ComputedColumnRewriter; import org.apache.kylin.query.util.QueryAliasMatchInfo; @@ -39,22 +40,29 @@ public class RemoveIncapableRealizationsRule extends PruningRule { if (candidate.getCapability() != null) { return; } - candidate.getCtx().resetSQLDigest(); - CapabilityResult capability = getCapabilityResult(candidate); + + // Preserve the initial OlapContext and initialize the matching result of Candidate as false. + OLAPContextProp propsBeforeRewrite = RealizationChooser.preservePropsBeforeRewrite(candidate.getCtx()); + CapabilityResult capabilityResult = new CapabilityResult(); IRealization realization = candidate.getRealization(); - if (!capability.isCapable() && !realization.getModel().getComputedColumnDescs().isEmpty()) { - log.info("{}({}/{}): try rewrite computed column and then check whether the realization is capable.", - this.getClass().getName(), realization.getProject(), realization.getCanonicalName()); - BiMap<String, String> aliasMapping = HashBiMap.create(); - aliasMapping.putAll(candidate.getMatchedJoinsGraphAliasMap()); + if (!realization.getModel().getComputedColumnDescs().isEmpty()) { + BiMap<String, String> aliasMapping = HashBiMap.create(candidate.getMatchedJoinsGraphAliasMap()); ComputedColumnRewriter.rewriteCcInnerCol(candidate.getCtx(), realization.getModel(), new QueryAliasMatchInfo(aliasMapping, null)); candidate.getCtx().resetSQLDigest(); - capability = getCapabilityResult(candidate); + capabilityResult = getCapabilityResult(candidate); + candidate.recordRewrittenCtxProps(); } - candidate.setCapability(capability); + if (!capabilityResult.isCapable()) { + RealizationChooser.restoreOLAPContextProps(candidate.getCtx(), propsBeforeRewrite); + candidate.getCtx().resetSQLDigest(); + capabilityResult = getCapabilityResult(candidate); + candidate.recordRewrittenCtxProps(); + } + + candidate.setCapability(capabilityResult); } private CapabilityResult getCapabilityResult(Candidate candidate) { @@ -67,6 +75,11 @@ public class RemoveIncapableRealizationsRule extends PruningRule { } else { capability = DataflowCapabilityChecker.check((NDataflow) realization, candidate, sqlDigest); } + + // The matching process may modify the dimensions and measures info of the OlapContext, + // so we need these properties to be recorded in the candidate's rewrittenCtx. It is important + // that once the OlapContext matched an index, no further matching will be performed. + candidate.recordRewrittenCtxProps(); return capability; } } diff --git a/src/query/src/test/java/org/apache/kylin/query/routing/CandidateSortTest.java b/src/query/src/test/java/org/apache/kylin/query/routing/CandidateSortTest.java index 530bafff85..472d7abf83 100644 --- a/src/query/src/test/java/org/apache/kylin/query/routing/CandidateSortTest.java +++ b/src/query/src/test/java/org/apache/kylin/query/routing/CandidateSortTest.java @@ -24,12 +24,15 @@ import java.util.List; import org.apache.kylin.common.QueryContext; import org.apache.kylin.guava30.shaded.common.collect.Lists; import org.apache.kylin.guava30.shaded.common.collect.Maps; +import org.apache.kylin.guava30.shaded.common.collect.Sets; +import org.apache.kylin.junit.annotation.MetadataInfo; import org.apache.kylin.metadata.cube.model.NDataflow; import org.apache.kylin.query.relnode.OLAPContext; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.mockito.Mockito; +@MetadataInfo class CandidateSortTest { @Test @@ -81,7 +84,8 @@ class CandidateSortTest { Comparator<Candidate> comparator = Candidate.realizationCostSorter(); NDataflow df1 = Mockito.mock(NDataflow.class); NDataflow df2 = Mockito.mock(NDataflow.class); - OLAPContext olapContext = Mockito.mock(OLAPContext.class); + OLAPContext olapContext = new OLAPContext(0); + olapContext.allColumns = Sets.newHashSet(); Candidate c1 = new Candidate(df1, olapContext, Maps.newHashMap()); Candidate c2 = new Candidate(df2, olapContext, Maps.newHashMap()); Mockito.when(c1.getRealization().getCost()).thenReturn(1); diff --git a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/tdvt/TDVTTest.java b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/tdvt/TDVTTest.java index 9ad7cab786..3a664820a1 100644 --- a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/tdvt/TDVTTest.java +++ b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/tdvt/TDVTTest.java @@ -41,6 +41,9 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.util.Unsafe; import org.apache.kylin.engine.spark.IndexDataConstructor; +import org.apache.kylin.guava30.shaded.common.collect.ImmutableList; +import org.apache.kylin.guava30.shaded.common.collect.ImmutableMap; +import org.apache.kylin.guava30.shaded.common.collect.ImmutableSet; import org.apache.kylin.job.SecondStorageJobParamUtil; import org.apache.kylin.job.common.ExecutableUtil; import org.apache.kylin.job.execution.DefaultExecutable; @@ -51,7 +54,9 @@ import org.apache.kylin.job.handler.SecondStorageSegmentLoadJobHandler; import org.apache.kylin.job.model.JobParam; import org.apache.kylin.metadata.cube.model.NDataSegment; import org.apache.kylin.metadata.cube.model.NDataflowManager; +import org.apache.kylin.metadata.model.ComputedColumnDesc; import org.apache.kylin.metadata.model.NDataModelManager; +import org.apache.kylin.query.util.PushDownUtil; import org.apache.kylin.util.ExecAndComp; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -70,10 +75,6 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.testcontainers.containers.JdbcDatabaseContainer; -import org.apache.kylin.guava30.shaded.common.collect.ImmutableList; -import org.apache.kylin.guava30.shaded.common.collect.ImmutableMap; -import org.apache.kylin.guava30.shaded.common.collect.ImmutableSet; - import io.kyligence.kap.newten.clickhouse.ClickHouseUtils; import io.kyligence.kap.secondstorage.SecondStorageUtil; import io.kyligence.kap.secondstorage.test.ClickHouseClassRule; @@ -162,6 +163,18 @@ public class TDVTTest implements JobWaiter { dataflowManager.getDataflow(AUTO_MODEL_STAPLES_1).getSegments().stream() .map(NDataSegment::getId).collect(Collectors.toList()))); + // For historical reasons, the innerExpression of ComputedColumn is not standardized + modelManager.listAllModels().forEach(model -> { + if (model.isBroken()) { + return; + } + List<ComputedColumnDesc> ccList = model.getComputedColumnDescs(); + for (ComputedColumnDesc ccDesc : ccList) { + String innerExp = PushDownUtil.massageComputedColumn(model, model.getProject(), ccDesc, null); + ccDesc.setInnerExpression(innerExp); + } + }); + // check test.checkHttpServer(); test.overwriteSystemProp("kylin.query.use-tableindex-answer-non-raw-query", "true"); diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala index bd72f8b219..d2ba4fca42 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala @@ -164,7 +164,7 @@ object SparkSqlClient { QueryContext.current().getMetrics.setQueryStageCount(stageCount) QueryContext.current().getMetrics.setQueryTaskCount(taskCount) // return result - (readPushDownResultRow(resultRows, true), resultSize, fieldList) + (readPushDownResultRow(resultRows, checkInterrupt = true), resultSize, fieldList) } catch { case e: Throwable => if (e.isInstanceOf[InterruptedException]) { @@ -212,6 +212,7 @@ object SparkSqlClient { case value: mutable.WrappedArray.ofRef[AnyRef] => value.array.map(v => rawValueToString(v, true)).mkString("[", ",", "]") case value: immutable.Map[Any, Any] => value.map(p => rawValueToString(p._1, true) + ":" + rawValueToString(p._2, true)).mkString("{", ",", "}") + case value: Array[Byte] => new String(value) case value: Any => value.toString } } diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderTypeUtil.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderTypeUtil.scala index 90981d3d64..79d4aaf451 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderTypeUtil.scala +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderTypeUtil.scala @@ -34,7 +34,6 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String -import org.springframework.util.Base64Utils import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Long => JLong, Short => JShort} import java.math.BigDecimal @@ -297,13 +296,15 @@ object SparderTypeUtil extends Logging { case (ts: java.sql.Timestamp, _) => DateFormat.castTimestampToString(ts.getTime) case (dt: java.sql.Date, _) => DateFormat.formatToDateStr(dt.getTime) case (str: java.lang.String, _) => formatStringValue(str) - case (value: mutable.WrappedArray.ofRef[Any], _) => + case (value: mutable.WrappedArray.ofRef[AnyRef], _) => + value.array.map(v => convertToStringWithCalciteType(v, relType, true)).mkString("[", ",", "]") + case (value: mutable.WrappedArray[Any], _) => value.array.map(v => convertToStringWithCalciteType(v, relType, true)).mkString("[", ",", "]") case (value: immutable.Map[Any, Any], _) => value .map(v => convertToStringWithCalciteType(v._1, relType, true) + ":" + convertToStringWithCalciteType(v._2, relType, true)) .mkString("{", ",", "}") - case (value: Array[Byte], _) => Base64Utils.encodeToString(value) + case (value: Array[Byte], _) => new String(value) case (other, _) => other.toString } } diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/LayoutFileIndex.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/LayoutFileIndex.scala deleted file mode 100644 index 2d5c2c1c00..0000000000 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/LayoutFileIndex.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -//package org.apache.spark.sql.execution.datasource -// -//import io.kyligence.kap.engine.spark.job.NSparkCubingUtil -//import io.kyligence.kap.metadata.cube.model.NDataSegment -//import org.apache.hadoop.fs.Path -//import org.apache.kylin.common.QueryContext -//import org.apache.spark.sql.execution.datasources._ -//import org.apache.spark.sql.{AnalysisException, SparkSession} -//import org.apache.spark.sql.catalyst.catalog.{CatalogTable, ExternalCatalogUtils} -//import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Cast, Expression, InterpretedPredicate, Literal, Predicate} -//import org.apache.spark.sql.catalyst.InternalRow -//import org.apache.spark.sql.internal.SQLConf -//import org.apache.spark.sql.types.StructType -// -//import scala.collection.JavaConverters._ -// -//class LayoutFileIndex( -// sparkSession: SparkSession, -// catalogTable: CatalogTable, -// override val sizeInBytes: Long, -// segments: Seq[NDataSegment]) extends CatalogFileIndex(sparkSession, catalogTable, sizeInBytes) with ResetShufflePartition { -// -// private val fileStatusCache = ShardFileStatusCache.getFileStatusCache(sparkSession) -// -// lazy val paths: Seq[Path] = { -// segments -// .map(seg => new Path(NSparkCubingUtil.getStoragePath(seg, catalogTable.identifier.table.toLong))) -// } -// -// override def rootPaths: Seq[Path] = paths -// -// override def filterPartitions(partitionFilters: Seq[Expression]): InMemoryFileIndex = { -// val index = if (catalogTable.partitionColumnNames.nonEmpty) { -// val partitionSchema = catalogTable.partitionSchema -// val partitionColumnNames = catalogTable.partitionColumnNames.toSet -// val nonPartitionPruningPredicates = partitionFilters.filterNot { -// _.references.map(_.name).toSet.subsetOf(partitionColumnNames) -// } -// if (nonPartitionPruningPredicates.nonEmpty) { -// throw new AnalysisException("Expected only partition pruning predicates:" + nonPartitionPruningPredicates) -// } -// var partitionPaths = segments -// .flatMap { seg => -// val layout = seg.getLayout(catalogTable.identifier.table.toLong) -// val data = layout.getPartitionValues -// val baseDir = NSparkCubingUtil.getStoragePath(seg, layout.getLayoutId) -// data.asScala.map(dt => (dt, baseDir + "/" + dt)) -// }.map { tp => -// val spec = PartitioningUtils.parsePathFragment(tp._1) -// val row = InternalRow.fromSeq(partitionSchema.map { field => -// val partValue = if (spec(field.name) == ExternalCatalogUtils.DEFAULT_PARTITION_NAME) { -// null -// } else { -// spec(field.name) -// } -// Cast(Literal(partValue), field.dataType, Option(SQLConf.get.sessionLocalTimeZone)).eval() -// }) -// PartitionPath(row, new Path(tp._2)) -// } -// if (partitionFilters.nonEmpty) { -// val boundPredicate = -// Predicate.create(partitionFilters.reduce(And).transform { -// case att: AttributeReference => -// val index = partitionSchema.indexWhere(_.name == att.name) -// BoundReference(index, partitionSchema(index).dataType, nullable = true) -// }) -// partitionPaths = partitionPaths.filter(partitionPath => boundPredicate.eval(partitionPath.values)) -// } -// new PrunedInMemoryFileIndex(sparkSession, fileStatusCache, PartitionSpec(partitionSchema, partitionPaths)) -// } else { -// new InMemoryFileIndex( -// sparkSession, rootPaths, Map.empty[String, String], userSpecifiedSchema = Option(table.schema)) -// } -// QueryContext.current().record("partition_pruning") -// setShufflePartitions($"${data}", index.allFiles().map(_.getLen).sum, sparkSession) -// QueryContext.current().record("fetch_file_status") -// -// index -// } -// -// override def partitionSchema: StructType = catalogTable.partitionSchema -//} -//private class PrunedInMemoryFileIndex( -// sparkSession: SparkSession, -// fileStatusCache: FileStatusCache, -// override val partitionSpec: PartitionSpec) -// extends InMemoryFileIndex( -// sparkSession, -// partitionSpec.partitions.map(_.path), -// Map.empty, -// Some(partitionSpec.partitionColumns), -// fileStatusCache) \ No newline at end of file