This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 04e216994b9c03e2f1eee44eab3427ab6b46d619 Author: Zhichao Zhang <zhan...@apache.org> AuthorDate: Thu Mar 11 15:00:26 2021 +0800 KYLIN-4927 Forbid to use AE when building Global Dict --- .../features/step_impl/auto_config/auto_config.py | 17 ++++++++ .../features/step_impl/happy_path/happy_path.py | 17 ++++++++ .../features/step_impl/project_model/model.py | 17 ++++++++ .../features/step_impl/project_model/project.py | 17 ++++++++ .../apache/spark/sql/common/LocalMetadata.scala | 10 +---- .../spark/builder/CubeDictionaryBuilder.scala | 18 +++++++- .../engine/spark/builder/TestGlobalDictBuild.scala | 50 ++++++++++++---------- 7 files changed, 113 insertions(+), 33 deletions(-) diff --git a/build/CI/kylin-system-testing/features/step_impl/auto_config/auto_config.py b/build/CI/kylin-system-testing/features/step_impl/auto_config/auto_config.py index 30ad76f..3cab963 100644 --- a/build/CI/kylin-system-testing/features/step_impl/auto_config/auto_config.py +++ b/build/CI/kylin-system-testing/features/step_impl/auto_config/auto_config.py @@ -1,3 +1,20 @@ +#!/usr/bin/python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from getgauge.python import step import os import json diff --git a/build/CI/kylin-system-testing/features/step_impl/happy_path/happy_path.py b/build/CI/kylin-system-testing/features/step_impl/happy_path/happy_path.py index 1d59e05..57e64d1 100644 --- a/build/CI/kylin-system-testing/features/step_impl/happy_path/happy_path.py +++ b/build/CI/kylin-system-testing/features/step_impl/happy_path/happy_path.py @@ -1,3 +1,20 @@ +#!/usr/bin/python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from getgauge.python import step import os import json diff --git a/build/CI/kylin-system-testing/features/step_impl/project_model/model.py b/build/CI/kylin-system-testing/features/step_impl/project_model/model.py index f159d93..2bc842c 100644 --- a/build/CI/kylin-system-testing/features/step_impl/project_model/model.py +++ b/build/CI/kylin-system-testing/features/step_impl/project_model/model.py @@ -1,3 +1,20 @@ +#!/usr/bin/python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from getgauge.python import step import os import json diff --git a/build/CI/kylin-system-testing/features/step_impl/project_model/project.py b/build/CI/kylin-system-testing/features/step_impl/project_model/project.py index 61a088f..9d46725 100644 --- a/build/CI/kylin-system-testing/features/step_impl/project_model/project.py +++ b/build/CI/kylin-system-testing/features/step_impl/project_model/project.py @@ -1,3 +1,20 @@ +#!/usr/bin/python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from getgauge.python import step import os import json diff --git a/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/LocalMetadata.scala b/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/LocalMetadata.scala index df2cc68..ab6b182 100644 --- a/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/LocalMetadata.scala +++ b/kylin-spark-project/kylin-spark-common/src/test/java/org/apache/spark/sql/common/LocalMetadata.scala @@ -49,14 +49,6 @@ trait LocalMetadata extends BeforeAndAfterAll with BeforeAndAfterEach { } def cleanAfterClass(): Unit = { - val directory = new File(LocalFileMetadataTestCase.LOCALMETA_TEMP_DATA) - try - FileUtils.deleteDirectory(directory) - catch { - case e: IOException => - if (directory.exists && directory.list.length > 0) throw new IllegalStateException("Can't delete directory " + directory, e) - } - System.clearProperty(KylinConfig.KYLIN_CONF) - KylinConfig.destroyInstance() + LocalFileMetadataTestCase.cleanAfterClass(); } } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala index 0b8ca3d..5c15f7d 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala @@ -44,9 +44,18 @@ class CubeDictionaryBuilder(val dataset: Dataset[Row], @throws[IOException] def buildDictSet(): Unit = { + // Set 'spark.sql.adaptive.enabled' to false if the value of it is true. + // Because when 'spark.sql.adaptive.enabled' is true, it will change the partition number + // dynamically and lead to wrong Global Dictionary results. + val aeOriginalValue = ss.conf.get("spark.sql.adaptive.enabled", "false").toBoolean + if (aeOriginalValue) { + ss.conf.set("spark.sql.adaptive.enabled", false); + } logInfo(s"Start building global dictionaries V2 for seg $seg") val m = s"Build global dictionaries V2 for seg $seg succeeded" time(m, colRefSet.asScala.foreach(col => safeBuild(col))) + // set the original value to 'spark.sql.adaptive.enabled' + ss.conf.set("spark.sql.adaptive.enabled", aeOriginalValue); } @throws[IOException] @@ -55,7 +64,7 @@ class CubeDictionaryBuilder(val dataset: Dataset[Row], lock.lock(getLockPath(sourceColumn), Long.MaxValue) try if (lock.lock(getLockPath(sourceColumn))) { - val dictColDistinct = dataset.select(wrapCol(ref)).distinct + val dictColDistinct = dataset.select(CubeDictionaryBuilder.wrapCol(ref)).distinct ss.sparkContext.setJobDescription("Calculate bucket size " + ref.identity) val bucketPartitionSize = DictionaryBuilderHelper.calculateBucketSize(seg, ref, dictColDistinct) val m = s"Build global dictionaries V2 for column $sourceColumn succeeded" @@ -66,6 +75,8 @@ class CubeDictionaryBuilder(val dataset: Dataset[Row], @throws[IOException] private[builder] def build(ref: ColumnDesc, bucketPartitionSize: Int, afterDistinct: Dataset[Row]): Unit = { + assert(!ss.conf.get("spark.sql.adaptive.enabled", "false").toBoolean, + "Parameter 'spark.sql.adaptive.enabled' must be false when building global dictionary.") val columnName = ref.identity logInfo(s"Start building global dictionaries V2 for column $columnName.") @@ -88,9 +99,12 @@ class CubeDictionaryBuilder(val dataset: Dataset[Row], private def getLockPath(pathName: String) = s"/${seg.project}${HadoopUtil.GLOBAL_DICT_STORAGE_ROOT}/$pathName/lock" +} + +object CubeDictionaryBuilder { + def wrapCol(ref: ColumnDesc): Column = { val colName = NSparkCubingUtil.convertFromDot(ref.identity) expr(colName).cast(StringType) } - } diff --git a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala index a0b33dc..0ed26cf 100644 --- a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala +++ b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala @@ -26,6 +26,7 @@ import org.apache.commons.lang3.RandomStringUtils import org.apache.kylin.common.KylinConfig import org.apache.kylin.common.util.DateFormat import org.apache.kylin.cube.{CubeInstance, CubeManager, CubeSegment} +import org.apache.kylin.engine.spark.job.NSparkCubingUtil import org.apache.kylin.engine.spark.metadata.{ColumnDesc, MetadataConverter, SegmentInfo} import org.apache.kylin.job.engine.JobEngineConfig import org.apache.kylin.job.impl.threadpool.DefaultScheduler @@ -36,8 +37,8 @@ import org.apache.spark.dict.{NGlobalDictMetaInfo, NGlobalDictionary} import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.common.{LocalMetadata, SharedSparkSession, SparderBaseFunSuite} import org.apache.spark.sql.functions.col -import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql.types.StructType import org.junit.Assert import scala.collection.JavaConverters.setAsJavaSetConverter @@ -69,63 +70,67 @@ class TestGlobalDictBuild extends SparderBaseFunSuite with SharedSparkSession wi // When to resize the dictionary, please refer to the description of DictionaryBuilderHelper.calculateBucketSize + val dictCol = dictColSet.iterator().next() // First build dictionary, no dictionary file exists - var randomDataSet = generateOriginData(1000, 21) + var randomDataSet = generateOriginData(dictCol, 1000, 21) val meta1 = buildDict(segInfo, seg, randomDataSet, dictColSet) Assert.assertEquals(20, meta1.getBucketSize) Assert.assertEquals(1000, meta1.getDictCount) // apply rule #1 - randomDataSet = generateOriginData(3000, 22) + randomDataSet = generateOriginData(dictCol, 3000, 22) val meta2 = buildDict(segInfo, seg, randomDataSet, dictColSet) Assert.assertEquals(60, meta2.getBucketSize) Assert.assertEquals(4000, meta2.getDictCount) - randomDataSet = generateOriginData(3000, 23) + randomDataSet = generateOriginData(dictCol, 3000, 23) val meta3 = buildDict(segInfo, seg, randomDataSet, dictColSet) Assert.assertEquals(60, meta3.getBucketSize) Assert.assertEquals(7000, meta3.getDictCount) // apply rule #2 - randomDataSet = generateOriginData(200, 24) + randomDataSet = generateOriginData(dictCol, 200, 24) val meta4 = buildDict(segInfo, seg, randomDataSet, dictColSet) Assert.assertEquals(140, meta4.getBucketSize) Assert.assertEquals(7200, meta4.getDictCount) // apply rule #3 - randomDataSet = generateHotOriginData(200, 140) + randomDataSet = generateHotOriginData(dictCol, 200, 140) val meta5 = buildDict(segInfo, seg, randomDataSet, dictColSet) Assert.assertEquals(140, meta5.getBucketSize) Assert.assertEquals(7400, meta5.getDictCount) // apply rule #3 - randomDataSet = generateOriginData(200, 25) + spark.conf.set("spark.sql.adaptive.enabled", false) + randomDataSet = generateOriginData(dictCol, 200, 25) val meta6 = buildDict(segInfo, seg, randomDataSet, dictColSet) Assert.assertEquals(280, meta6.getBucketSize) Assert.assertEquals(7600, meta6.getDictCount) + Assert.assertFalse(spark.conf.get("spark.sql.adaptive.enabled").toBoolean) - randomDataSet = generateOriginData(2000, 26) + spark.conf.set("spark.sql.adaptive.enabled", true) + randomDataSet = generateOriginData(dictCol, 2000, 26) val meta7 = buildDict(segInfo, seg, randomDataSet, dictColSet) Assert.assertEquals(280, meta7.getBucketSize) Assert.assertEquals(9600, meta7.getDictCount) + Assert.assertTrue(spark.conf.get("spark.sql.adaptive.enabled").toBoolean) DefaultScheduler.destroyInstance() } - def buildDict(segInfo: SegmentInfo, seg: CubeSegment, randomDataSet: Dataset[Row], dictColSet: Set[ColumnDesc]): NGlobalDictMetaInfo = { + def buildDict(segInfo: SegmentInfo, seg: CubeSegment, randomDataSet: Dataset[Row], + dictColSet: Set[ColumnDesc]): NGlobalDictMetaInfo = { val dictionaryBuilder = new CubeDictionaryBuilder(randomDataSet, segInfo, randomDataSet.sparkSession, dictColSet) - val col = dictColSet.iterator().next() - val ds = randomDataSet.select("26").distinct() - val bucketPartitionSize = DictionaryBuilderHelper.calculateBucketSize(segInfo, col, ds) - dictionaryBuilder.build(col, bucketPartitionSize, ds) - val dict = new NGlobalDictionary(seg.getProject, col.tableName, col.columnName, - seg.getConfig.getHdfsWorkingDirectory) + dictionaryBuilder.buildDictSet() + val columnDesc = dictColSet.iterator().next() + val dict = new NGlobalDictionary(seg.getProject, columnDesc.tableName, + columnDesc.columnName, seg.getConfig.getHdfsWorkingDirectory) dict.getMetaInfo } - def generateOriginData(count: Int, length: Int): Dataset[Row] = { + def generateOriginData(colDesc: ColumnDesc, count: Int, length: Int): Dataset[Row] = { var schema = new StructType - - schema = schema.add("26", StringType) + val colName = NSparkCubingUtil.convertFromDot(colDesc.identity) + schema = schema.add(colName, colDesc.dataType) var set = new mutable.LinkedHashSet[Row] while (set.size != count) { val objects = new Array[String](1) @@ -136,11 +141,12 @@ class TestGlobalDictBuild extends SparderBaseFunSuite with SharedSparkSession wi spark.createDataFrame(spark.sparkContext.parallelize(set.toSeq), schema) } - def generateHotOriginData(threshold: Int, bucketSize: Int): Dataset[Row] = { + def generateHotOriginData(colDesc: ColumnDesc, threshold: Int, bucketSize: Int): Dataset[Row] = { var schema = new StructType - schema = schema.add("26", StringType) - var ds = generateOriginData(threshold * bucketSize * 2, 30) - ds = ds.repartition(bucketSize, col("26")) + val colName = NSparkCubingUtil.convertFromDot(colDesc.identity) + schema = schema.add(colName, colDesc.dataType) + var ds = generateOriginData(colDesc, threshold * bucketSize * 2, 30) + ds = ds.repartition(bucketSize, col(colName)) .mapPartitions { iter => val partitionID = TaskContext.get().partitionId()