This is an automated email from the ASF dual-hosted git repository. zhangzc pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new dfd012f KYLIN-4945 repartition encoded dataset to avoid data skew caused by single column dfd012f is described below commit dfd012f45b9740ae0598041d7f1326e3a58c0da7 Author: zhengshengjun <shengjun_zh...@sina.com> AuthorDate: Wed Mar 24 21:16:58 2021 +0800 KYLIN-4945 repartition encoded dataset to avoid data skew caused by single column --- .../org/apache/kylin/common/KylinConfigBase.java | 21 +++++++++++++++++ .../engine/spark/builder/CubeTableEncoder.scala | 17 ++++++++++++++ .../kylin/engine/spark/metadata/MetaData.scala | 1 + .../engine/spark/metadata/MetadataConverter.scala | 27 +++++++++++----------- 4 files changed, 53 insertions(+), 13 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 34f8ce9..9b17dc1 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -3104,4 +3104,25 @@ public abstract class KylinConfigBase implements Serializable { public int getMaxParentDatasetPersistCount() { return Integer.parseInt(getOptional("kylin.engine.spark.parent-dataset.max.persist.count", "1")); } + + public int getRepartitionNumAfterEncode() { + return Integer.valueOf(getOptional("kylin.engine.spark.dataset.repartition.num.after.encoding", "0")); + } + + /*** + * Global dictionary will be split into several buckets. To encode a column to int value more + * efficiently, source dataset will be repartitioned by the to-be encoded column to the same + * amount of partitions as the dictionary's bucket size. + * + * It sometimes bring side effect, because repartitioning by a single column is more likely to cause + * serious data skew, causing one task takes the majority of time in first layer's cuboid building. + * + * When faced with this case, you can try repartitioning encoded dataset by all + * RowKey columns to avoid data skew. The repartition size is default to max bucket + * size of all dictionaries, but you can also set to other flexible value by this option: + * 'kylin.engine.spark.dataset.repartition.num.after.encoding' + ***/ + public boolean rePartitionEncodedDatasetWithRowKey() { + return Boolean.valueOf(getOptional("kylin.engine.spark.repartition.encoded.dataset", "false")); + } } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala index e4a77c3..1460632 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala @@ -43,11 +43,15 @@ object CubeTableEncoder extends Logging { val bucketThreshold = seg.kylinconf.getGlobalDictV2ThresholdBucketSize val minBucketSize: Long = sourceCnt / bucketThreshold + var repartitionSizeAfterEncode = 0; cols.asScala.foreach( ref => { val globalDict = new NGlobalDictionary(seg.project, ref.tableAliasName, ref.columnName, seg.kylinconf.getHdfsWorkingDirectory) val bucketSize = globalDict.getBucketSizeOrDefault(seg.kylinconf.getGlobalDictV2MinHashPartitions) val enlargedBucketSize = (((minBucketSize / bucketSize) + 1) * bucketSize).toInt + if (enlargedBucketSize > repartitionSizeAfterEncode) { + repartitionSizeAfterEncode = enlargedBucketSize; + } val encodeColRef = convertFromDot(ref.identity) val columnIndex = structType.fieldIndex(encodeColRef) @@ -63,7 +67,20 @@ object CubeTableEncoder extends Logging { .select(columns: _*) } ) + ds.sparkSession.sparkContext.setJobDescription(null) + + //repartition by a single column during dict encode step before is more easily to cause data skew, add step to void such case. + if (!cols.isEmpty && seg.kylinconf.rePartitionEncodedDatasetWithRowKey) { + val colsInDS = partitionedDs.schema.map(_.name) + val rowKeyColRefs = seg.allRowKeyCols.map(colDesc => convertFromDot(colDesc.identity)).filter(colsInDS.contains).map(col) + //if not set in config, use the largest partition num during dict encode step + if (seg.kylinconf.getRepartitionNumAfterEncode > 0) { + repartitionSizeAfterEncode = seg.kylinconf.getRepartitionNumAfterEncode; + } + logInfo(s"repartition encoded dataset to $repartitionSizeAfterEncode partitions to avoid data skew") + partitionedDs = partitionedDs.repartition(repartitionSizeAfterEncode, rowKeyColRefs.toArray: _*) + } partitionedDs } } \ No newline at end of file diff --git a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala index e81aa8e..030834b 100644 --- a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala +++ b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala @@ -111,6 +111,7 @@ case class SegmentInfo(id: String, allDictColumns: Set[ColumnDesc], partitionExp: String, filterCondition: String, + allRowKeyCols: List[ColumnDesc], var snapshotInfo: Map[String, String] = Map.empty[String, String]) { def updateLayout(layoutEntity: LayoutEntity): Unit = { diff --git a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala index 0d12c25..a50a835 100644 --- a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala +++ b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala @@ -37,7 +37,7 @@ import scala.collection.mutable object MetadataConverter { def getSegmentInfo(cubeInstance: CubeInstance, segmentId: String, segmentName: String, identifier: String): SegmentInfo = { - val allColumnDesc = extractAllColumnDesc(cubeInstance) + val (allColumnDesc, allRowKeyCols) = extractAllColumnDesc(cubeInstance) val (layoutEntities, measure) = extractEntityAndMeasures(cubeInstance) val dictColumn = measure.values.filter(_.returnType.dataType.equals("bitmap")) .map(_.pra.head).toSet @@ -47,7 +47,8 @@ object MetadataConverter { dictColumn, dictColumn, extractPartitionExp(cubeInstance.getSegmentById(segmentId)), - extractFilterCondition(cubeInstance.getSegmentById(segmentId))) + extractFilterCondition(cubeInstance.getSegmentById(segmentId)), + allRowKeyCols.asScala.values.toList) } def getCubeUpdate(segmentInfo: SegmentInfo): CubeUpdate = { @@ -94,25 +95,25 @@ object MetadataConverter { tb.getColumns.asScala.map(ref => toColumnDesc(ref = ref)).toList, tb.getAlias, tb.getTableDesc.getSourceType, addInfo) } - def extractAllColumnDesc(cubeInstance: CubeInstance): java.util.LinkedHashMap[Integer, ColumnDesc] = { - val dimensionIndex = new util.LinkedHashMap[Integer, ColumnDesc]() + def extractAllColumnDesc(cubeInstance: CubeInstance): (java.util.LinkedHashMap[Integer, ColumnDesc], + java.util.LinkedHashMap[Integer, ColumnDesc]) = { + //use LinkedHashMap to keep RowKey column the same order as its bit index + val dimensions = new util.LinkedHashMap[Integer, ColumnDesc]() val columns = cubeInstance.getDescriptor .getRowkey .getRowKeyColumns val dimensionMapping = columns .map(co => (co.getColRef, co.getBitIndex)) val set = dimensionMapping.map(_._1).toSet - val refs = cubeInstance.getAllColumns.asScala.diff(set) + val measureCols = cubeInstance.getAllColumns.asScala.diff(set) .zipWithIndex .map(tp => (tp._1, tp._2 + dimensionMapping.length)) - - val columnIDTuples = dimensionMapping ++ refs - val colToIndex = columnIDTuples.toMap - columnIDTuples - .foreach { co => - dimensionIndex.put(co._2, toColumnDesc(co._1, co._2, set.contains(co._1))) - } - dimensionIndex + dimensionMapping.foreach(co => dimensions.put(co._2, toColumnDesc(co._1, co._2, true))) + val allColumns = new util.LinkedHashMap[Integer, ColumnDesc]() + //keep RowKey columns before measure columns in LinkedHashMap + allColumns.putAll(dimensions) + measureCols.foreach(co => allColumns.put(co._2, toColumnDesc(co._1, co._2, false))) + (allColumns, dimensions) } def toLayoutEntity(cubeInstance: CubeInstance, cuboid: Cuboid): LayoutEntity = {