This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit a6f78cd53b951acf3ec966fa537dae3f2b82fd1e Author: XiaoxiangYu <x...@apache.org> AuthorDate: Mon Nov 23 21:44:23 2020 +0800 KYLIN-4818 Calculate cuboid rowcount via HLL --- .../kylin/common/annotation/Clarification.java | 6 +- .../java/org/apache/kylin/cube/CubeSegment.java | 19 ++- .../engine/spark/application/SparkApplication.java | 2 +- .../engine/spark/builder/NBuildSourceInfo.java | 16 +-- .../kylin/engine/spark/job/CubeBuildJob.java | 27 ++++- .../kylin/engine/spark/job/CuboidAggregator.scala | 1 + .../kylin/engine/spark/job/CuboidStatistics.scala | 129 +++++++++++++++++++++ .../engine/spark/job/ParentSourceChooser.scala | 30 ++++- .../spark/job/ResourceDetectBeforeCubingJob.java | 1 + .../kylin/engine/spark/metadata/MetaData.scala | 21 +++- .../engine/spark/metadata/MetadataConverter.scala | 12 +- 11 files changed, 234 insertions(+), 30 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/annotation/Clarification.java b/core-common/src/main/java/org/apache/kylin/common/annotation/Clarification.java index b039e50..4b0ea2f 100644 --- a/core-common/src/main/java/org/apache/kylin/common/annotation/Clarification.java +++ b/core-common/src/main/java/org/apache/kylin/common/annotation/Clarification.java @@ -30,9 +30,11 @@ import java.lang.annotation.Target; ElementType.CONSTRUCTOR, ElementType.LOCAL_VARIABLE, ElementType.PACKAGE}) public @interface Clarification { - Priority priority(); + Priority priority() default Priority.MINOR; - String msg() default "N/A"; + String msg() default "null"; + + boolean deprecated() default false; // Please remove deprecated key when Kylin4 GA enum Priority { MINOR, diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java index d0224fb..715e684 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java @@ -31,6 +31,7 @@ import java.util.TimeZone; import java.util.concurrent.ConcurrentHashMap; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.annotation.Clarification; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.Pair; @@ -75,8 +76,10 @@ public class CubeSegment implements IBuildable, ISegment, Serializable { @JsonProperty("date_range_end") private long dateRangeEnd; @JsonProperty("source_offset_start") + @Clarification(deprecated = true) private long sourceOffsetStart; @JsonProperty("source_offset_end") + @Clarification(deprecated = true) private long sourceOffsetEnd; @JsonProperty("status") private SegmentStatusEnum status; @@ -107,6 +110,7 @@ public class CubeSegment implements IBuildable, ISegment, Serializable { private String binarySignature; // a hash of cube schema and dictionary ID, used for sanity check @JsonProperty("dictionaries") + @Clarification(deprecated = true) private ConcurrentHashMap<String, String> dictionaries; // table/column ==> dictionary resource path @JsonProperty("snapshots") private ConcurrentHashMap<String, String> snapshots; // table name ==> snapshot resource path @@ -116,13 +120,16 @@ public class CubeSegment implements IBuildable, ISegment, Serializable { @JsonProperty("source_partition_offset_start") @JsonInclude(JsonInclude.Include.NON_EMPTY) + @Clarification(deprecated = true) private Map<Integer, Long> sourcePartitionOffsetStart = Maps.newHashMap(); @JsonProperty("source_partition_offset_end") @JsonInclude(JsonInclude.Include.NON_EMPTY) + @Clarification(deprecated = true) private Map<Integer, Long> sourcePartitionOffsetEnd = Maps.newHashMap(); @JsonProperty("stream_source_checkpoint") + @Clarification(deprecated = true) private String streamSourceCheckpoint; @JsonProperty("additionalInfo") @@ -530,11 +537,19 @@ public class CubeSegment implements IBuildable, ISegment, Serializable { } public String getStatisticsResourcePath() { - return getStatisticsResourcePath(this.getCubeInstance().getName(), this.getUuid()); + return getStatisticsResourcePath(this.getCubeInstance().getName(), this.getUuid(), ".seq"); + } + + public String getPreciseStatisticsResourcePath() { + return getStatisticsResourcePath(this.getCubeInstance().getName(), this.getUuid(), ".json"); } public static String getStatisticsResourcePath(String cubeName, String cubeSegmentId) { - return ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeName + "/" + cubeSegmentId + ".seq"; + return getStatisticsResourcePath(cubeName, cubeSegmentId, ".seq"); + } + + public static String getStatisticsResourcePath(String cubeName, String cubeSegmentId, String suffix) { + return ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeName + "/" + cubeSegmentId + suffix; } @Override diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java index 98dde2b..7867474 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java @@ -219,7 +219,7 @@ public abstract class SparkApplication { jobId = getParam(MetadataConstants.P_JOB_ID); project = getParam(MetadataConstants.P_PROJECT_NAME); if (getParam(MetadataConstants.P_CUBOID_NUMBER) != null) { - layoutSize = Integer.valueOf(getParam(MetadataConstants.P_CUBOID_NUMBER)); + layoutSize = Integer.parseInt(getParam(MetadataConstants.P_CUBOID_NUMBER)); } try (KylinConfig.SetAndUnsetThreadLocalConfig autoCloseConfig = KylinConfig .setAndUnsetThreadLocalConfig(MetaDumpUtil.loadKylinConfigFromHdfs(hdfsMetalUrl))) { diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/builder/NBuildSourceInfo.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/builder/NBuildSourceInfo.java index 4ae87ef..51d7414 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/builder/NBuildSourceInfo.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/builder/NBuildSourceInfo.java @@ -34,7 +34,7 @@ import org.apache.kylin.shaded.com.google.common.base.Preconditions; public class NBuildSourceInfo { protected static final Logger logger = LoggerFactory.getLogger(NBuildSourceInfo.class); - private Dataset<Row> flattableDS; + private Dataset<Row> flatTableDS; private String viewFactTablePath; private SparkSession ss; private long byteSize; @@ -52,12 +52,12 @@ public class NBuildSourceInfo { this.byteSize = byteSize; } - public void setFlattableDS(Dataset<Row> flattableDS) { - this.flattableDS = flattableDS; + public void setFlatTableDS(Dataset<Row> flatTableDS) { + this.flatTableDS = flatTableDS; } - public Dataset<Row> getFlattableDS() { - return flattableDS; + public Dataset<Row> getFlatTableDS() { + return flatTableDS; } public Dataset<Row> getParentDS() { @@ -66,9 +66,9 @@ public class NBuildSourceInfo { Preconditions.checkNotNull(ss, "SparkSession is null is NBuildSourceInfo."); return ss.read().parquet(parentStoragePath); } else { - Preconditions.checkState(flattableDS != null, "Path and DS can no be empty at the same time."); - logger.info("parent storage path not exists, use flattable dataset."); - return flattableDS; + Preconditions.checkState(flatTableDS != null, "Path and DS can no be empty at the same time."); + logger.info("parent storage path not exists, use flatTable dataset."); + return flatTableDS; } } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java index 38a101b..1028911 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java @@ -59,6 +59,8 @@ import org.apache.kylin.engine.spark.utils.QueryExecutionCache; import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.model.IStorageAware; import org.apache.kylin.storage.StorageFactory; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.hive.utils.ResourceDetectUtils; @@ -69,6 +71,7 @@ import org.apache.kylin.shaded.com.google.common.base.Preconditions; import org.apache.kylin.shaded.com.google.common.collect.Lists; import org.apache.kylin.shaded.com.google.common.collect.Sets; +import scala.Tuple2; import scala.collection.JavaConversions; public class CubeBuildJob extends SparkApplication { @@ -113,6 +116,7 @@ public class CubeBuildJob extends SparkApplication { // choose source ParentSourceChooser sourceChooser = new ParentSourceChooser(spanningTree, seg, jobId, ss, config, true); sourceChooser.decideSources(); + Tuple2<String, AggInfo>[] aggInfos = sourceChooser.getAggInfo(); NBuildSourceInfo buildFromFlatTable = sourceChooser.flatTableSource(); Map<Long, NBuildSourceInfo> buildFromLayouts = sourceChooser.reuseSources(); @@ -131,7 +135,8 @@ public class CubeBuildJob extends SparkApplication { infos.recordSpanningTree(segId, spanningTree); logger.info("Updating segment info"); - updateSegmentInfo(getParam(MetadataConstants.P_CUBE_ID), seg, buildFromFlatTable.getFlattableDS().count()); + assert buildFromFlatTable != null; + updateSegmentInfo(getParam(MetadataConstants.P_CUBE_ID), seg, buildFromFlatTable.getFlatTableDS().count()); } updateSegmentSourceBytesSize(getParam(MetadataConstants.P_CUBE_ID), ResourceDetectUtils.getSegmentSourceSize(shareDir)); @@ -157,6 +162,26 @@ public class CubeBuildJob extends SparkApplication { List<CubeSegment> cubeSegments = Lists.newArrayList(); CubeSegment segment = cubeCopy.getSegmentById(segmentInfo.id()); segment.setSizeKB(segmentInfo.getAllLayoutSize() / 1024); + List<String> cuboidStatics = new LinkedList<>(); + + String template = "{\"cuboid\":%d, \"rows\": %d, \"size\": %d}"; + for (LayoutEntity layoutEntity : segmentInfo.getAllLayoutJava()) { + cuboidStatics.add(String.format(template, layoutEntity.getId(), layoutEntity.getRows(), layoutEntity.getByteSize())); + } + + JavaSparkContext jsc = JavaSparkContext.fromSparkContext(ss.sparkContext()); + JavaRDD<String> cuboidStatRdd = jsc.parallelize(cuboidStatics); + for (String cuboid : cuboidStatics) { + logger.info("Statistics \t: {}", cuboid); + } + String path = config.getHdfsWorkingDirectory() + segment.getPreciseStatisticsResourcePath(); + logger.info("Saving {} {}", path, segmentInfo); + try { + cuboidStatRdd.saveAsTextFile(path); + } catch (Exception e) { + logger.error("Error", e); + } + segment.setLastBuildTime(System.currentTimeMillis()); segment.setLastBuildJobID(getParam(MetadataConstants.P_JOB_ID)); segment.setInputRecords(sourceRowCount); diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala index 54afadf..84beaf2 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala @@ -47,6 +47,7 @@ object CuboidAggregator { aggInternal(ss, dataSet, dimensions, measures, isSparkSql) } + //noinspection ScalaStyle def aggInternal(ss: SparkSession, dataSet: DataFrame, dimensions: util.Set[Integer], diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatistics.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatistics.scala new file mode 100644 index 0000000..d0ea199 --- /dev/null +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatistics.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.job + + +import org.apache.kylin.engine.spark.metadata.SegmentInfo +import org.apache.kylin.measure.hllc.HLLCounter +import org.apache.kylin.shaded.com.google.common.hash.{HashFunction, Hashing} +import org.apache.spark.sql.{Dataset, Row} + +import scala.collection.mutable + + +object CuboidStatistics { + + def sample(inputDs: Dataset[Row], seg: SegmentInfo): Array[(String, AggInfo)] = { + seg.getAllLayout.map(x => x.getId) + val rkc = seg.allColumns.count(c => c.rowKey) + val res = inputDs.rdd //.sample(withReplacement = false, 0.3) + .mapPartitions(new CuboidStatistics(seg.getAllLayout.map(x => x.getId), rkc).statisticsInPartition) + val l = res.map(a => (a.key, a)).reduceByKey((a, b) => a.merge(b)).collect() + l.foreach(x => println(x._1 + " >>><<< " + x._2.cuboid.counter.getCountEstimate)) + l + } +} + +class CuboidStatistics(ids: List[Long], rkc: Int) extends Serializable { + private val info = mutable.Map[String, AggInfo]() + val allCuboidsBitSet: Array[Array[Integer]] = getCuboidBitSet(ids, rkc) + private val hf: HashFunction = Hashing.murmur3_128 + private val rowHashCodesLong = new Array[Long](rkc) + + def statisticsInPartition(rows: Iterator[Row]): Iterator[AggInfo] = { + init() + rows.foreach(update) + info.valuesIterator + } + + def init(): Unit = { + ids.foreach(i => info.put(i.toString, AggInfo(i.toString))) + } + + def update(r: Row): Unit = { + println(r) + updateCuboid(r) + } + + def updateCuboid(r: Row): Unit = { + // generate hash for each row key column + + var idx = 0 + while (idx < rkc) { + val hc = hf.newHasher + var colValue = r.get(idx).toString + if (colValue == null) colValue = "0" + //add column ordinal to the hash value to distinguish between (a,b) and (b,a) + rowHashCodesLong(idx) = hc.putUnencodedChars(colValue).hash().padToLong() + idx + idx += 1 + } + + // use the row key column hash to get a consolidated hash for each cuboid + val n = allCuboidsBitSet.length + idx = 0 + while (idx < n) { + var value: Long = 0 + var position = 0 + while (position < allCuboidsBitSet(idx).length) { + value += rowHashCodesLong(allCuboidsBitSet(idx)(position)) + position += 1 + } + info(ids(idx).toString).cuboid.counter.addHashDirectly(value) + idx += 1 + } + } + + def getCuboidBitSet(cuboidIds: List[Long], nRowKey: Int): Array[Array[Integer]] = { + val allCuboidsBitSet: Array[Array[Integer]] = new Array[Array[Integer]](cuboidIds.length) + var j: Int = 0 + while (j < cuboidIds.length) { + val cuboidId: Long = cuboidIds(j) + allCuboidsBitSet(j) = new Array[Integer](java.lang.Long.bitCount(cuboidId)) + var mask: Long = 1L << (nRowKey - 1) + var position: Int = 0 + var i: Int = 0 + while (i < nRowKey) { + if ((mask & cuboidId) > 0) { + allCuboidsBitSet(j)(position) = i + position += 1 + } + mask = mask >> 1 + i += 1 + } + j += 1 + } + allCuboidsBitSet + } +} + +case class AggInfo(key: String, + cuboid: CuboidInfo = CuboidInfo(new HLLCounter()), + sample: SampleInfo = SampleInfo(), + dimension: DimensionInfo = DimensionInfo()) { + def merge(o: AggInfo): AggInfo = { + this.cuboid.counter.merge(o.cuboid.counter) + this + } +} + +case class CuboidInfo(counter: HLLCounter = new HLLCounter(14)) + +case class SampleInfo(data: Array[String] = new Array(3)) + +case class DimensionInfo(range: mutable.Map[String, String] = mutable.Map[String, String]()) diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala index 7697fd6..446f3e6 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala @@ -39,17 +39,25 @@ class ParentSourceChooser( config: KylinConfig, needEncoding: Boolean) extends Logging { + var aggInfo : Array[(String, AggInfo)] = _ + // build from built cuboid. var reuseSources: java.util.Map[java.lang.Long, NBuildSourceInfo] = Maps.newHashMap() - // build from flattable. + // build from flatTable. var flatTableSource: NBuildSourceInfo = _ + var detectStep = false + //TODO: MetadataConverter don't have getCubeDesc() now /*val flatTableDesc = new CubeJoinedFlatTableDesc( MetadataConverter.getCubeDesc(seg.getCube), ParentSourceChooser.needJoinLookupTables(seg.getModel, toBuildTree))*/ + def setDetectStep(): Unit = + detectStep = true + + def getAggInfo : Array[(String, AggInfo)] = aggInfo def decideSources(): Unit = { toBuildTree.getRootIndexEntities.asScala.foreach { entity => @@ -72,7 +80,19 @@ class ParentSourceChooser( builder.checkDupKey() seg = builder.buildSnapshot } - flatTableSource = getFlatTable() + flatTableSource = getFlatTable + + val rowKeyColumns: Seq[String] = seg.allColumns.filter(c => c.rowKey).map(c => c.id.toString) + if (aggInfo == null && !detectStep) { + logInfo("Start sampling ...") + val coreDs = flatTableSource.getFlatTableDS.select(rowKeyColumns.head, rowKeyColumns.tail: _*) + aggInfo = CuboidStatistics.sample(coreDs, seg) + logInfo("Finish sampling ...") + val statisticsStr = aggInfo.sortBy(x => x._1).map(x => x._1 + ":" + x._2.cuboid.counter.getCountEstimate).mkString("\n") + logInfo(statisticsStr) + } else { + logInfo("Skip sampling ...") + } } flatTableSource.addCuboid(entity) } @@ -91,7 +111,7 @@ class ParentSourceChooser( var path = "" if (flatTableSource != null && flatTableSource.getToBuildCuboids.size() > config.getPersistFlatTableThreshold) { - val df = flatTableSource.getFlattableDS + val df = flatTableSource.getFlatTableDS if (df.schema.nonEmpty) { val allUsedCols = flatTableSource.getToBuildCuboids.asScala.flatMap { c => val dims = c.getOrderedDimensions.keySet().asScala.map(_.toString) @@ -152,7 +172,7 @@ class ParentSourceChooser( buildSource } - private def getFlatTable(): NBuildSourceInfo = { + private def getFlatTable: NBuildSourceInfo = { // val viewPath = persistFactViewIfNecessary() val sourceInfo = new NBuildSourceInfo sourceInfo.setSparkSession(ss) @@ -162,7 +182,7 @@ class ParentSourceChooser( // val needJoin = ParentSourceChooser.needJoinLookupTables(seg.getModel, toBuildTree) val flatTable = new CreateFlatTable(seg, toBuildTree, ss, sourceInfo) val afterJoin: Dataset[Row] = flatTable.generateDataset(needEncoding, true) - sourceInfo.setFlattableDS(afterJoin) + sourceInfo.setFlatTableDS(afterJoin) logInfo("No suitable ready layouts could be reused, generate dataset from flat table.") sourceInfo diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java index 065bd5e..c3828cd 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java @@ -58,6 +58,7 @@ public class ResourceDetectBeforeCubingJob extends SparkApplication { ResourceDetectUtils.write(new Path(config.getJobTmpShareDir(project, jobId), ResourceDetectUtils.countDistinctSuffix()), ResourceDetectUtils.findCountDistinctMeasure(JavaConversions.asJavaCollection(seg.toBuildLayouts()))); ParentSourceChooser datasetChooser = new ParentSourceChooser(spanningTree, seg, jobId, ss, config, false); + datasetChooser.setDetectStep(); datasetChooser.decideSources(); NBuildSourceInfo buildFromFlatTable = datasetChooser.flatTableSource(); if (buildFromFlatTable != null) { diff --git a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala index d12e9ea..e81aa8e 100644 --- a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala +++ b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala @@ -31,15 +31,16 @@ class ColumnDesc(val columnName: String, val dataType: DataType, val tableName: String, val tableAliasName: String, - val id: Int) extends Serializable { + val id: Int, + val rowKey: Boolean = false) extends Serializable { def identity: String = s"$tableAliasName.$columnName" def isColumnType: Boolean = true } object ColumnDesc { - def apply(columnName: String, dataType: DataType, tableName: String, tableAliasName: String, id: Int): - ColumnDesc = new ColumnDesc(columnName, dataType, tableName, tableAliasName, id) + def apply(columnName: String, dataType: DataType, tableName: String, tableAliasName: String, id: Int, rowKey: Boolean): + ColumnDesc = new ColumnDesc(columnName, dataType, tableName, tableAliasName, id, rowKey) } case class LiteralColumnDesc(override val columnName: String, @@ -120,11 +121,21 @@ case class SegmentInfo(id: String, snapshotInfo = tableInfo } - def getAllLayoutSize(): Long = { + def getAllLayoutSize: Long = { layouts.map(_.getByteSize).sum } - def getSnapShot2JavaMap(): java.util.Map[String, String] = { + def getAllLayout: List[LayoutEntity] = { + layouts + } + + def getAllLayoutJava: java.util.List[LayoutEntity] = { + val l: util.LinkedList[LayoutEntity] = new java.util.LinkedList() + layouts.foreach(o => l.add(o)) + l + } + + def getSnapShot2JavaMap: java.util.Map[String, String] = { snapshotInfo.asJava } diff --git a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala index 970ff7b..30067ae 100644 --- a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala +++ b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala @@ -110,7 +110,7 @@ object MetadataConverter { val colToIndex = columnIDTuples.toMap columnIDTuples .foreach { co => - dimensionIndex.put(co._2, toColumnDesc(co._1, co._2)) + dimensionIndex.put(co._2, toColumnDesc(co._1, co._2, set.contains(co._1))) } dimensionIndex } @@ -159,9 +159,9 @@ object MetadataConverter { val dimensionMap = dimensionMapping.toMap val shardByColumnsId = shardByColumns.asScala.toList - .map(column => dimensionMap.get(column)) - .filter(v => v != null) - .map(column => Integer.valueOf(column.get)) + .map(column => dimensionMap.get(column)) + .filter(v => v != null) + .map(column => Integer.valueOf(column.get)) val set = dimensionMapping.map(_._1).toSet val refs = cubeInstance.getAllColumns.asScala.diff(set) @@ -204,13 +204,13 @@ object MetadataConverter { extractEntityAndMeasures(cubeInstance)._1.asJava } - private def toColumnDesc(ref: TblColRef, index: Int = -1) = { + private def toColumnDesc(ref: TblColRef, index: Int = -1, rowKey: Boolean = false) = { val dataType = SparkTypeUtil.toSparkType(KyDataType.getType(ref.getDatatype)) val columnDesc = if (ref.getColumnDesc.isComputedColumn) { ComputedColumnDesc(ref.getName, dataType, ref.getTableRef.getTableName, ref.getTableRef.getAlias, index, ref.getExpressionInSourceDB) } else { - ColumnDesc(ref.getName, dataType, ref.getTableRef.getTableName, ref.getTableRef.getAlias, index) + ColumnDesc(ref.getName, dataType, ref.getTableRef.getTableName, ref.getTableRef.getAlias, index, rowKey) } columnDesc }