This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 5be135c [KYLIN-4940] Implement the step of "Extract Dictionary from Global Dictionary" for spark cubing engine 5be135c is described below commit 5be135c61c80b9241ac7632dec41cc7209b9d23a Author: yangjiang <yangji...@ebay.com> AuthorDate: Tue Jan 19 11:34:35 2021 +0800 [KYLIN-4940] Implement the step of "Extract Dictionary from Global Dictionary" for spark cubing engine --- .../engine/spark/SparkBatchCubingJobBuilder2.java | 4 + .../kylin/engine/spark/SparkCubingByLayer.java | 174 ++++++++++++++++++++- .../org/apache/kylin/engine/spark/SparkUtil.java | 40 +++++ 3 files changed, 211 insertions(+), 7 deletions(-) diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java index 7d6a367..4b43318 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java @@ -201,6 +201,10 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport { sparkExecutable.setParam(SparkCubingByLayer.OPTION_META_URL.getOpt(), getSegmentMetadataUrl(seg.getConfig(), jobId)); sparkExecutable.setParam(SparkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(), cuboidRootPath); + if (seg.getCubeDesc().isShrunkenDictFromGlobalEnabled()) { + sparkExecutable.setParam(SparkCubingByLayer.OPTION_SHRUNK_INPUT_PATH.getOpt(), + getShrunkenDictionaryPath(jobId)); + } sparkExecutable.setJobId(jobId); StringBuilder jars = new StringBuilder(); diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java index cbedc8b..09c68c9 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java @@ -17,21 +17,28 @@ */ package org.apache.kylin.engine.spark; +import java.io.DataOutputStream; +import java.io.IOException; import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Locale; +import java.util.Map; +import java.util.Set; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.AbstractApplication; import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.OptionsHelper; import org.apache.kylin.cube.CubeDescManager; @@ -44,6 +51,8 @@ import org.apache.kylin.cube.kv.AbstractRowKeyEncoder; import org.apache.kylin.cube.kv.RowKeyEncoderProvider; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; +import org.apache.kylin.dict.ShrunkenDictionary; +import org.apache.kylin.dict.ShrunkenDictionaryBuilder; import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.mr.BatchCubingJobBuilder2; import org.apache.kylin.engine.mr.IMROutput2; @@ -59,13 +68,19 @@ import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.measure.MeasureAggregators; import org.apache.kylin.measure.MeasureIngester; import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.shaded.com.google.common.collect.Lists; +import org.apache.kylin.shaded.com.google.common.collect.Sets; import org.apache.spark.SparkConf; +import org.apache.spark.TaskContext; import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.storage.StorageLevel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,6 +106,9 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa .withDescription("Hive Intermediate Table").create("hiveTable"); public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg() .isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT); + public static final Option OPTION_SHRUNK_INPUT_PATH = OptionBuilder + .withArgName(BatchConstants.ARG_SHRUNKEN_DICT_PATH).hasArg().isRequired(false) + .withDescription("shrunken Dictionary Path").create(BatchConstants.ARG_SHRUNKEN_DICT_PATH); private Options options; @@ -102,6 +120,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa options.addOption(OPTION_SEGMENT_ID); options.addOption(OPTION_META_URL); options.addOption(OPTION_OUTPUT_PATH); + options.addOption(OPTION_SHRUNK_INPUT_PATH); } @Override @@ -117,6 +136,8 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME); String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID); String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH); + String shrunkInputPath = optionsHelper.getOptionValue(OPTION_SHRUNK_INPUT_PATH); + logger.info("shrunkInputPath is {}", shrunkInputPath); Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1") }; @@ -130,7 +151,12 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa JavaSparkContext sc = new JavaSparkContext(conf); sc.sc().addSparkListener(jobListener); HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath)); - SparkUtil.modifySparkHadoopConfiguration(sc.sc(), AbstractHadoopJob.loadKylinConfigFromHdfs(new SerializableConfiguration(sc.hadoopConfiguration()), metaUrl)); // set dfs.replication and enable compress + SparkUtil.modifySparkHadoopConfiguration(sc.sc(), AbstractHadoopJob + .loadKylinConfigFromHdfs(new SerializableConfiguration(sc.hadoopConfiguration()), metaUrl)); // set dfs.replication and enable compress + + if (shrunkInputPath != null) + sc.hadoopConfiguration().set(BatchConstants.ARG_SHRUNKEN_DICT_PATH, shrunkInputPath); + final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration()); KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl); @@ -164,9 +190,21 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa boolean isSequenceFile = JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat()); - final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = SparkUtil - .hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable) - .mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf)); + final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD; + logger.info("isShrunkenDictFromGlobalEnabled {} shrunkInputPath is {}", + cubeDesc.isShrunkenDictFromGlobalEnabled(), shrunkInputPath); + + JavaRDD<String[]> recordInputRDD = null; + + if (cubeDesc.isShrunkenDictFromGlobalEnabled() && shrunkInputPath != null) { + recordInputRDD = SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable).cache(); + recordInputRDD + .foreachPartition(new CreateShrunkenDictionary(cubeName, cubeDesc, cubeSegment, envConfig, sConf)); + encodedBaseRDD = recordInputRDD.mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf)); + } else { + encodedBaseRDD = SparkUtil.hiveRecordInputRDD(isSequenceFile, sc, inputPath, hiveTable) + .mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf)); + } Long totalCount = 0L; if (envConfig.isSparkSanityCheckEnabled()) { @@ -190,6 +228,11 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa saveToHDFS(allRDDs[0], metaUrl, cubeName, cubeSegment, outputPath, 0, job, envConfig); + // use ShrunkenDictionary should unpersist recordInputRDD + if (recordInputRDD != null) { + recordInputRDD.unpersist(); + } + PairFlatMapFunction flatMapFunction = new CuboidFlatMap(cubeName, segmentId, metaUrl, sConf); // aggregate to ND cuboids for (level = 1; level <= totalLevels; level++) { @@ -286,9 +329,16 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc); long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); Cuboid baseCuboid = Cuboid.findForMandatory(cubeDesc, baseCuboidId); - baseCuboidBuilder = new BaseCuboidBuilder(kConfig, cubeDesc, cubeSegment, interDesc, - AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid), - MeasureIngester.create(cubeDesc.getMeasures()), cubeSegment.buildDictionaryMap()); + String splitKey = String.valueOf(TaskContext.getPartitionId()); + try { + baseCuboidBuilder = new BaseCuboidBuilder(kConfig, cubeDesc, cubeSegment, interDesc, + AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid), + MeasureIngester.create(cubeDesc.getMeasures()), + SparkUtil.getDictionaryMap(cubeSegment, splitKey, conf.get())); + } catch (IOException e) { + logger.error("Fail to get shrunk dict"); + e.printStackTrace(); + } initialized = true; } } @@ -466,4 +516,114 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa return count; } + public static class CreateShrunkenDictionary implements VoidFunction<Iterator<String[]>> { + private String cubeName; + private CubeDesc cubeDesc; + private CubeSegment cubeSeg; + + private KylinConfig kylinConfig; + private SerializableConfiguration scof; + private CubeJoinedFlatTableEnrich intermediateTableDesc; + + private List<TblColRef> globalColumns; + private int[] globalColumnIndex; + private List<Set<String>> globalColumnValues; + + private volatile transient boolean initialized = false; + + private String splitKey; + + public CreateShrunkenDictionary(String cubeName, CubeDesc cubeDesc, CubeSegment cubeSegment, KylinConfig kylinConfig, + SerializableConfiguration serializableConfiguration) { + this.cubeName = cubeName; + this.cubeDesc = cubeDesc; + this.cubeSeg = cubeSegment; + this.kylinConfig = kylinConfig; + this.scof = serializableConfiguration; + this.intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), + cubeDesc); + } + + public void init() { + try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig + .setAndUnsetThreadLocalConfig(kylinConfig)) { + globalColumns = cubeDesc.getAllGlobalDictColumnsNeedBuilt(); + globalColumnIndex = new int[globalColumns.size()]; + globalColumnValues = Lists.newArrayListWithExpectedSize(globalColumns.size()); + + splitKey = String.valueOf(TaskContext.getPartitionId()); + + for (int i = 0; i < globalColumns.size(); i++) { + TblColRef colRef = globalColumns.get(i); + int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef); + globalColumnIndex[i] = columnIndexOnFlatTbl; + globalColumnValues.add(Sets.<String>newHashSet()); + } + } + } + + @Override + public void call(Iterator<String[]> iter) throws Exception { + + if (initialized == false) { + synchronized (CreateShrunkenDictionary.class) { + if (initialized == false) { + init(); + initialized = true; + } + } + } + int count = 0; + while (iter.hasNext()) { + count++; + String[] rowArray = iter.next(); + for (int i = 0; i < globalColumnIndex.length; i++) { + String fieldValue = rowArray[globalColumnIndex[i]]; + if (fieldValue == null) + continue; + globalColumnValues.get(i).add(fieldValue); + } + } + + FileSystem fs = FileSystem.get(scof.get()); + Path outputDirBase = new Path(scof.get().get(BatchConstants.ARG_SHRUNKEN_DICT_PATH)); + + Map<TblColRef, Dictionary<String>> globalDictionaryMap = cubeSeg + .buildGlobalDictionaryMap(globalColumns.size()); + + ShrunkenDictionary.StringValueSerializer strValueSerializer = new ShrunkenDictionary.StringValueSerializer(); + + for (int i = 0; i < globalColumns.size(); i++) { + List<String> colDistinctValues = Lists.newArrayList(globalColumnValues.get(i)); + if (colDistinctValues.size() == 0) { + continue; + } + // sort values to accelerate the encoding process by reducing the swapping of global dictionary slices + Collections.sort(colDistinctValues); + + //only get one col dict + ShrunkenDictionaryBuilder<String> dictBuilder = new ShrunkenDictionaryBuilder<>( + globalDictionaryMap.get(globalColumns.get(i))); + + for (String colValue : colDistinctValues) { + dictBuilder.addValue(colValue); + } + + Dictionary<String> shrunkenDict = dictBuilder.build(strValueSerializer); + + Path colDictDir = new Path(outputDirBase, globalColumns.get(i).getIdentity()); + + if (!fs.exists(colDictDir)) { + fs.mkdirs(colDictDir); + } + Path shrunkenDictPath = new Path(colDictDir, splitKey); + try (DataOutputStream dos = fs.create(shrunkenDictPath)) { + logger.info("Write Shrunken dictionary to {} success", shrunkenDictPath); + shrunkenDict.write(dos); + } + } + + } + } + } diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java index b963252..d146c85 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java @@ -18,9 +18,12 @@ package org.apache.kylin.engine.spark; +import java.io.DataInputStream; import java.io.IOException; import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -30,14 +33,17 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.StringUtil; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.dict.ShrunkenDictionary; import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.mr.IMROutput2; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.CubeStatsReader; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; +import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.SourceManager; import org.apache.kylin.storage.StorageFactory; import org.apache.spark.SparkContext; @@ -53,9 +59,13 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.hive.HiveUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SparkUtil { + private static final Logger logger = LoggerFactory.getLogger(SparkUtil.class); + public static ISparkBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) { IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(seg); return (ISparkBatchCubingInputSide)SourceManager.createEngineAdapter(seg, ISparkInput.class).getBatchCubingInputSide(flatDesc); @@ -188,4 +198,34 @@ public class SparkUtil { }); } + public static Map<TblColRef, Dictionary<String>> getDictionaryMap(CubeSegment cubeSegment, String splitKey, + Configuration configuration) throws IOException { + Map<TblColRef, Dictionary<String>> dictionaryMap = cubeSegment.buildDictionaryMap(); + + String shrunkenDictPath = configuration.get(BatchConstants.ARG_SHRUNKEN_DICT_PATH); + if (shrunkenDictPath == null) { + return dictionaryMap; + } + + // replace global dictionary with shrunken dictionary if possible + FileSystem fs = FileSystem.get(configuration); + ShrunkenDictionary.StringValueSerializer valueSerializer = new ShrunkenDictionary.StringValueSerializer(); + for (TblColRef colRef : cubeSegment.getCubeDesc().getAllGlobalDictColumnsNeedBuilt()) { + Path colShrunkenDictDir = new Path(shrunkenDictPath, colRef.getIdentity()); + Path colShrunkenDictPath = new Path(colShrunkenDictDir, splitKey); + if (!fs.exists(colShrunkenDictPath)) { + logger.warn("Shrunken dictionary for column " + colRef.getIdentity() + " in split " + splitKey + + " does not exist!!!"); + continue; + } + try (DataInputStream dis = fs.open(colShrunkenDictPath)) { + Dictionary<String> shrunkenDict = new ShrunkenDictionary(valueSerializer); + shrunkenDict.readFields(dis); + logger.info("Read Shrunken dictionary from {} success", colShrunkenDictPath); + dictionaryMap.put(colRef, shrunkenDict); + } + } + + return dictionaryMap; + } }