KYLIN-2811, refine spark cubing Signed-off-by: Hongbin Ma <mahong...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a1c234a9 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a1c234a9 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a1c234a9 Branch: refs/heads/2622-2764 Commit: a1c234a9afbc5a30306f4275127649f980ab75bd Parents: 4316cfd Author: Cheng Wang <cheng.w...@kyligence.io> Authored: Sun Aug 27 13:29:12 2017 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Sun Aug 27 15:08:18 2017 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/common/KylinConfig.java | 50 +-- .../metadata/model/ComputedColumnDesc.java | 10 +- .../kylin/engine/mr/common/BatchConstants.java | 1 + .../spark/SparkBatchCubingJobBuilder2.java | 1 + .../kylin/engine/spark/SparkCubingByLayer.java | 380 ++++++++++++------- .../localmeta/cube_desc/ci_inner_join_cube.json | 2 +- .../kylin/rest/util/Log4jConfigListener.java | 1 + 7 files changed, 267 insertions(+), 178 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/a1c234a9/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java index a56e9b8..1d5e0ec 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java @@ -18,6 +18,15 @@ package org.apache.kylin.common; +import com.google.common.base.Preconditions; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.kylin.common.restclient.RestClient; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.OrderedProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; @@ -34,26 +43,15 @@ import java.util.Enumeration; import java.util.Map; import java.util.Properties; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.kylin.common.restclient.RestClient; -import org.apache.kylin.common.util.ClassUtil; -import org.apache.kylin.common.util.HadoopUtil; -import org.apache.kylin.common.util.OrderedProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; - /** */ public class KylinConfig extends KylinConfigBase { private static final long serialVersionUID = 1L; private static final Logger logger = LoggerFactory.getLogger(KylinConfig.class); - /** Kylin properties file name */ + /** + * Kylin properties file name + */ public static final String KYLIN_CONF_PROPERTIES_FILE = "kylin.properties"; public static final String KYLIN_CONF = "KYLIN_CONF"; @@ -62,7 +60,7 @@ public class KylinConfig extends KylinConfigBase { // thread-local instances, will override SYS_ENV_INSTANCE private static transient ThreadLocal<KylinConfig> THREAD_ENV_INSTANCE = new ThreadLocal<>(); - + static { /* * Make Calcite to work with Unicode. @@ -121,9 +119,6 @@ public class KylinConfig extends KylinConfigBase { } private static UriType decideUriType(String metaUri) { - if (metaUri.indexOf("@hdfs") > 0) { - return UriType.HDFS_FILE; - } try { File file = new File(metaUri); @@ -163,23 +158,6 @@ public class KylinConfig extends KylinConfigBase { */ UriType uriType = decideUriType(uri); - if (uriType == UriType.HDFS_FILE) { - KylinConfig config; - FileSystem fs; - int cut = uri.indexOf('@'); - String realHdfsPath = uri.substring(0, cut) + "/" + KYLIN_CONF_PROPERTIES_FILE; - try { - config = new KylinConfig(); - fs = HadoopUtil.getFileSystem(realHdfsPath); - InputStream is = fs.open(new Path(realHdfsPath)); - Properties prop = streamToProps(is); - config.reloadKylinConfig(prop); - } catch (IOException e) { - throw new RuntimeException(e); - } - return config; - } - if (uriType == UriType.LOCAL_FOLDER) { KylinConfig config = new KylinConfig(); config.setMetadataUrl(uri); @@ -241,7 +219,7 @@ public class KylinConfig extends KylinConfigBase { public static void setKylinConfigThreadLocal(KylinConfig config) { THREAD_ENV_INSTANCE.set(config); } - + public static void removeKylinConfigThreadLocal() { THREAD_ENV_INSTANCE.remove(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/a1c234a9/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java index e8cc351..2ee2b38 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java @@ -17,12 +17,13 @@ */ package org.apache.kylin.metadata.model; +import java.io.Serializable; + +import org.apache.kylin.metadata.model.tool.CalciteParser; + import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import org.apache.kylin.metadata.model.tool.CalciteParser; - -import java.io.Serializable; @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) public class ComputedColumnDesc implements Serializable { @@ -46,7 +47,8 @@ public class ComputedColumnDesc implements Serializable { tableIdentity = tableIdentity.toUpperCase(); columnName = columnName.toUpperCase(); - CalciteParser.ensureNoTableNameExists(expression); + if ("true".equals(System.getProperty("needCheckCC"))) + CalciteParser.ensureNoTableNameExists(expression); } public String getFullName() { http://git-wip-us.apache.org/repos/asf/kylin/blob/a1c234a9/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java index 0cb23ac..1ca7024 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@ -63,6 +63,7 @@ public interface BatchConstants { String CFG_OUTPUT_DICT = "dict"; String CFG_OUTPUT_STATISTICS = "statistics"; String CFG_OUTPUT_PARTITION = "partition"; + String CFG_MR_SPARK_JOB = "mr.spark.job"; /** * command line ARGuments http://git-wip-us.apache.org/repos/asf/kylin/blob/a1c234a9/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java index 07bc334..f1e6aea 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java @@ -49,6 +49,7 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 { sparkExecutable.setParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt(), seg.getUuid()); sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(), seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatTableDesc.getTableName()); sparkExecutable.setParam(SparkCubingByLayer.OPTION_META_URL.getOpt(), getSegmentMetadataUrl(seg.getConfig(), seg.getUuid())); + sparkExecutable.setParam(SparkCubingByLayer.OPTION_CONF_PATH.getOpt(), KylinConfig.getKylinConfPath()); sparkExecutable.setParam(SparkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(), cuboidRootPath); StringBuilder jars = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/kylin/blob/a1c234a9/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java index a8e7378..a3a6ad0 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java @@ -17,18 +17,28 @@ */ package org.apache.kylin.engine.spark; +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.AbstractApplication; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.OptionsHelper; import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.CubeDescManager; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -41,6 +51,8 @@ import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.mr.BatchCubingJobBuilder2; +import org.apache.kylin.engine.mr.IMROutput2; +import org.apache.kylin.engine.mr.MRUtil; import org.apache.kylin.engine.mr.common.BaseCuboidBuilder; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.CubeStatsReader; @@ -50,27 +62,22 @@ import org.apache.kylin.measure.MeasureAggregators; import org.apache.kylin.measure.MeasureIngester; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.spark.SparkConf; +import org.apache.spark.SparkFiles; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.hive.HiveContext; import org.apache.spark.storage.StorageLevel; +import org.apache.spark.util.SizeEstimator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Tuple2; -import java.io.IOException; -import java.io.Serializable; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; +import scala.Tuple2; /** * Spark application to build cube with the "by-layer" algorithm. Only support source data from Hive; Metadata in HBase. @@ -79,13 +86,21 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa protected static final Logger logger = LoggerFactory.getLogger(SparkCubingByLayer.class); - public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME); - public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube Segment Id").create("segmentId"); - public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true).withDescription("HDFS metadata url").create("metaUrl"); - public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg().isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT); - public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable"); + public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg() + .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME); + public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true) + .withDescription("Cube Segment Id").create("segmentId"); + public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true) + .withDescription("HDFS metadata url").create("metaUrl"); + public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg() + .isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT); + public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true) + .withDescription("Hive Intermediate Table").create("hiveTable"); + public static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true) + .withDescription("Configuration Path").create("confPath"); private Options options; + private static String metaUrl; public SparkCubingByLayer() { options = new Options(); @@ -94,6 +109,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa options.addOption(OPTION_SEGMENT_ID); options.addOption(OPTION_META_URL); options.addOption(OPTION_OUTPUT_PATH); + options.addOption(OPTION_CONF_PATH); } @Override @@ -101,41 +117,44 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa return options; } - public static KylinConfig loadKylinConfig(String metaUrl) throws IOException { - KylinConfig kylinConfig = KylinConfig.createInstanceFromUri(metaUrl); - KylinConfig.setKylinConfigThreadLocal(kylinConfig); - return kylinConfig; + public static KylinConfig getKylinConfigForExecutor() { + File file = new File(SparkFiles.get(KylinConfig.KYLIN_CONF_PROPERTIES_FILE)); + String confPath = file.getParentFile().getAbsolutePath(); + System.setProperty(KylinConfig.KYLIN_CONF, confPath); + final KylinConfig config = KylinConfig.getInstanceFromEnv(); + config.setMetadataUrl(metaUrl); + return config; } @Override protected void execute(OptionsHelper optionsHelper) throws Exception { - final String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE); - final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME); - final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID); - final String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL); - final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH); + metaUrl = optionsHelper.getOptionValue(OPTION_META_URL); + String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE); + String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME); + String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID); + String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH); + String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH); + + Class[] kryoClassArray = new Class[] { org.apache.hadoop.io.Text.class, + Class.forName("scala.reflect.ClassTag$$anon$1"), java.lang.Class.class }; SparkConf conf = new SparkConf().setAppName("Cubing for:" + cubeName + " segment " + segmentId); //serialization conf conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator"); - conf.set("spark.kryo.registrationRequired", "true"); + conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray); + JavaSparkContext sc = new JavaSparkContext(conf); HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath)); - final KylinConfig kylinConfig = loadKylinConfig(metaUrl); - HiveContext sqlContext = new HiveContext(sc.sc()); - final DataFrame intermediateTable = sqlContext.table(hiveTable); - final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName); + sc.addFile(confPath + File.separator + KylinConfig.KYLIN_CONF_PROPERTIES_FILE); + System.setProperty(KylinConfig.KYLIN_CONF, confPath); + KylinConfig envConfig = KylinConfig.getInstanceFromEnv(); + + final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName); final CubeDesc cubeDesc = cubeInstance.getDescriptor(); final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); - final CubeJoinedFlatTableEnrich intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc); - final Broadcast<CubeDesc> vCubeDesc = sc.broadcast(cubeDesc); - final Broadcast<CubeSegment> vCubeSegment = sc.broadcast(cubeSegment); - final NDCuboidBuilder ndCuboidBuilder = new NDCuboidBuilder(vCubeSegment.getValue(), new RowKeyEncoderProvider(vCubeSegment.getValue())); - final Broadcast<CuboidScheduler> vCuboidScheduler = sc.broadcast(new CuboidScheduler(vCubeDesc.getValue())); - final int measureNum = cubeDesc.getMeasures().size(); int countMeasureIndex = 0; for (MeasureDesc measureDesc : cubeDesc.getMeasures()) { if (measureDesc.getFunction().isCount() == true) { @@ -144,7 +163,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa countMeasureIndex++; } } - final CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, kylinConfig); + final CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, envConfig); boolean[] needAggr = new boolean[cubeDesc.getMeasures().size()]; boolean allNormalMeasure = true; for (int i = 0; i < cubeDesc.getMeasures().size(); i++) { @@ -153,141 +172,212 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa } logger.info("All measure are normal (agg on all cuboids) ? : " + allNormalMeasure); StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK_SER(); - // encode with dimension encoding, transform to <ByteArray, Object[]> RDD - final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = intermediateTable.javaRDD().mapToPair(new PairFunction<Row, ByteArray, Object[]>() { - volatile transient boolean initialized = false; - BaseCuboidBuilder baseCuboidBuilder = null; - - @Override - public Tuple2<ByteArray, Object[]> call(Row row) throws Exception { - if (initialized == false) { - synchronized (SparkCubingByLayer.class) { - if (initialized == false) { - loadKylinConfig(metaUrl); - long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); - Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); - baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, cubeDesc, cubeSegment, intermediateTableDesc, AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid), MeasureIngester.create(cubeDesc.getMeasures()), cubeSegment.buildDictionaryMap()); - initialized = true; - } - } - } - - String[] rowArray = rowToArray(row); - baseCuboidBuilder.resetAggrs(); - byte[] rowKey = baseCuboidBuilder.buildKey(rowArray); - Object[] result = baseCuboidBuilder.buildValueObjects(rowArray); - return new Tuple2<>(new ByteArray(rowKey), result); - } - private String[] rowToArray(Row row) { - String[] result = new String[row.size()]; - for (int i = 0; i < row.size(); i++) { - final Object o = row.get(i); - if (o != null) { - result[i] = o.toString(); - } else { - result[i] = null; - } - } - return result; - } + HiveContext sqlContext = new HiveContext(sc.sc()); + final DataFrame intermediateTable = sqlContext.table(hiveTable); - }); + // encode with dimension encoding, transform to <ByteArray, Object[]> RDD + final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = intermediateTable.javaRDD() + .mapToPair(new EncodeBaseCuboid(cubeName, segmentId)); - logger.info("encodedBaseRDD partition number: " + encodedBaseRDD.getNumPartitions()); Long totalCount = 0L; - if (kylinConfig.isSparkSanityCheckEnabled()) { + if (envConfig.isSparkSanityCheckEnabled()) { totalCount = encodedBaseRDD.count(); - logger.info("encodedBaseRDD row count: " + encodedBaseRDD.count()); } - final MeasureAggregators measureAggregators = new MeasureAggregators(cubeDesc.getMeasures()); - final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new BaseCuboidReducerFunction2(measureNum, vCubeDesc.getValue(), measureAggregators); + + final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new BaseCuboidReducerFunction2(cubeName); BaseCuboidReducerFunction2 reducerFunction2 = baseCuboidReducerFunction; if (allNormalMeasure == false) { - reducerFunction2 = new CuboidReducerFunction2(measureNum, vCubeDesc.getValue(), measureAggregators, needAggr); + reducerFunction2 = new CuboidReducerFunction2(cubeName, needAggr); } + final int totalLevels = cubeDesc.getBuildLevel(); JavaPairRDD<ByteArray, Object[]>[] allRDDs = new JavaPairRDD[totalLevels + 1]; int level = 0; - int partition = estimateRDDPartitionNum(level, cubeStatsReader, kylinConfig); + long baseRDDSize = SizeEstimator.estimate(encodedBaseRDD) / (1024 * 1024); + int partition = estimateRDDPartitionNum(level, cubeStatsReader, envConfig, (int) baseRDDSize); + // aggregate to calculate base cuboid allRDDs[0] = encodedBaseRDD.reduceByKey(baseCuboidReducerFunction, partition).persist(storageLevel); Configuration confOverwrite = new Configuration(sc.hadoopConfiguration()); confOverwrite.set("dfs.replication", "2"); // cuboid intermediate files, replication=2 - saveToHDFS(allRDDs[0], vCubeDesc.getValue(), outputPath, 0, confOverwrite); + + saveToHDFS(allRDDs[0], cubeName, cubeSegment, outputPath, 0, confOverwrite); + // aggregate to ND cuboids - PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> flatMapFunction = new CuboidFlatMap(metaUrl, vCubeSegment.getValue(), vCubeDesc.getValue(), vCuboidScheduler.getValue(), ndCuboidBuilder); for (level = 1; level <= totalLevels; level++) { - partition = estimateRDDPartitionNum(level, cubeStatsReader, kylinConfig); - logger.info("Level " + level + " partition number: " + partition); - allRDDs[level] = allRDDs[level - 1].flatMapToPair(flatMapFunction).reduceByKey(reducerFunction2, partition).persist(storageLevel); - if (kylinConfig.isSparkSanityCheckEnabled() == true) { + long levelRddSize = SizeEstimator.estimate(allRDDs[level - 1]) / (1024 * 1024); + partition = estimateRDDPartitionNum(level, cubeStatsReader, envConfig, (int) levelRddSize); + allRDDs[level] = allRDDs[level - 1].flatMapToPair(new CuboidFlatMap(cubeName, segmentId)) + .reduceByKey(reducerFunction2, partition).persist(storageLevel); + if (envConfig.isSparkSanityCheckEnabled() == true) { sanityCheck(allRDDs[level], totalCount, level, cubeStatsReader, countMeasureIndex); } - saveToHDFS(allRDDs[level], vCubeDesc.getValue(), outputPath, level, confOverwrite); + saveToHDFS(allRDDs[level], cubeName, cubeSegment, outputPath, level, confOverwrite); allRDDs[level - 1].unpersist(); } allRDDs[totalLevels].unpersist(); logger.info("Finished on calculating all level cuboids."); - deleteHDFSMeta(metaUrl); } - private static int estimateRDDPartitionNum(int level, CubeStatsReader statsReader, KylinConfig kylinConfig) { - double baseCuboidSize = statsReader.estimateLayerSize(level); + private static int estimateRDDPartitionNum(int level, CubeStatsReader statsReader, KylinConfig kylinConfig, + int rddSize) { + int baseCuboidSize = (int) Math.min(rddSize, statsReader.estimateLayerSize(level)); float rddCut = kylinConfig.getSparkRDDPartitionCutMB(); int partition = (int) (baseCuboidSize / rddCut); partition = Math.max(kylinConfig.getSparkMinPartition(), partition); partition = Math.min(kylinConfig.getSparkMaxPartition(), partition); - logger.debug("Estimated level " + level + " partition number: " + partition); return partition; } - private static void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, final CubeDesc cubeDesc, final String hdfsBaseLocation, int level, Configuration conf) { + private void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, final String cubeName, + final CubeSegment cubeSeg, final String hdfsBaseLocation, int level, Configuration conf) throws Exception { final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level); - rdd.mapToPair(new PairFunction<Tuple2<ByteArray, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() { - BufferedMeasureCodec codec = new BufferedMeasureCodec(cubeDesc.getMeasures()); - @Override - public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> call(Tuple2<ByteArray, Object[]> tuple2) throws Exception { - ByteBuffer valueBuf = codec.encode(tuple2._2()); - byte[] encodedBytes = new byte[valueBuf.position()]; - System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position()); - return new Tuple2<>(new org.apache.hadoop.io.Text(tuple2._1().array()), new org.apache.hadoop.io.Text(encodedBytes)); - } - }).saveAsNewAPIHadoopFile(cuboidOutputPath, org.apache.hadoop.io.Text.class, org.apache.hadoop.io.Text.class, SequenceFileOutputFormat.class, conf); + Job job = Job.getInstance(conf); + IMROutput2.IMROutputFormat outputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getOuputFormat(); + outputFormat.configureJobOutput(job, cuboidOutputPath, cubeSeg, level); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeSeg.getCubeInstance().getName()); + job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, cubeSeg.getUuid()); + job.getConfiguration().set(BatchConstants.CFG_MR_SPARK_JOB, "spark"); + + rdd.mapToPair( + new PairFunction<Tuple2<ByteArray, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() { + private volatile transient boolean initialized = false; + BufferedMeasureCodec codec; + + @Override + public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> call( + Tuple2<ByteArray, Object[]> tuple2) throws Exception { + if (!initialized) { + synchronized (SparkCubingByLayer.class) { + if (!initialized) { + KylinConfig kylinConfig = getKylinConfigForExecutor(); + CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName); + codec = new BufferedMeasureCodec(desc.getMeasures()); + initialized = true; + } + } + } + ByteBuffer valueBuf = codec.encode(tuple2._2()); + byte[] encodedBytes = new byte[valueBuf.position()]; + System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position()); + return new Tuple2<>(new org.apache.hadoop.io.Text(tuple2._1().array()), + new org.apache.hadoop.io.Text(encodedBytes)); + } + }).sortByKey().saveAsNewAPIHadoopDataset(job.getConfiguration()); logger.info("Persisting RDD for level " + level + " into " + cuboidOutputPath); } - class BaseCuboidReducerFunction2 implements Function2<Object[], Object[], Object[]> { - CubeDesc cubeDesc; - int measureNum; - MeasureAggregators aggregators; + static class EncodeBaseCuboid implements PairFunction<Row, ByteArray, Object[]> { + private volatile transient boolean initialized = false; + private BaseCuboidBuilder baseCuboidBuilder = null; + private String cubeName; + private String segmentId; + + public EncodeBaseCuboid(String cubeName, String segmentId) { + this.cubeName = cubeName; + this.segmentId = segmentId; + } + + @Override + public Tuple2<ByteArray, Object[]> call(Row row) throws Exception { + if (initialized == false) { + synchronized (SparkCubingByLayer.class) { + if (initialized == false) { + KylinConfig kConfig = getKylinConfigForExecutor(); + CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName); + CubeDesc cubeDesc = cubeInstance.getDescriptor(); + CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); + CubeJoinedFlatTableEnrich interDesc = new CubeJoinedFlatTableEnrich( + EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc); + long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); + Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); + baseCuboidBuilder = new BaseCuboidBuilder(kConfig, cubeDesc, cubeSegment, interDesc, + AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid), + MeasureIngester.create(cubeDesc.getMeasures()), cubeSegment.buildDictionaryMap()); + initialized = true; + } + } + } + String[] rowArray = rowToArray(row); + baseCuboidBuilder.resetAggrs(); + byte[] rowKey = baseCuboidBuilder.buildKey(rowArray); + Object[] result = baseCuboidBuilder.buildValueObjects(rowArray); + return new Tuple2<>(new ByteArray(rowKey), result); + } + + private String[] rowToArray(Row row) { + String[] result = new String[row.size()]; + for (int i = 0; i < row.size(); i++) { + final Object o = row.get(i); + if (o != null) { + result[i] = o.toString(); + } else { + result[i] = null; + } + } + return result; + } + } + + static class BaseCuboidReducerFunction2 implements Function2<Object[], Object[], Object[]> { + protected String cubeName; + protected CubeDesc cubeDesc; + protected int measureNum; + protected MeasureAggregators aggregators; + protected volatile transient boolean initialized = false; - BaseCuboidReducerFunction2(int measureNum, CubeDesc cubeDesc, MeasureAggregators aggregators) { - this.cubeDesc = cubeDesc; - this.measureNum = measureNum; - this.aggregators = aggregators; + public BaseCuboidReducerFunction2(String cubeName) { + this.cubeName = cubeName; + } + + public void init() { + KylinConfig kConfig = getKylinConfigForExecutor(); + CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName); + cubeDesc = cubeInstance.getDescriptor(); + aggregators = new MeasureAggregators(cubeDesc.getMeasures()); + measureNum = cubeDesc.getMeasures().size(); } @Override public Object[] call(Object[] input1, Object[] input2) throws Exception { + if (initialized == false) { + synchronized (SparkCubingByLayer.class) { + if (initialized == false) { + init(); + initialized = true; + } + } + } Object[] result = new Object[measureNum]; aggregators.aggregate(input1, input2, result); return result; } } - class CuboidReducerFunction2 extends BaseCuboidReducerFunction2 { - boolean[] needAggr; + static class CuboidReducerFunction2 extends BaseCuboidReducerFunction2 { + private boolean[] needAggr; - CuboidReducerFunction2(int measureNum, CubeDesc cubeDesc, MeasureAggregators aggregators, boolean[] needAggr) { - super(measureNum, cubeDesc, aggregators); + public CuboidReducerFunction2(String cubeName, boolean[] needAggr) { + super(cubeName); this.needAggr = needAggr; } @Override public Object[] call(Object[] input1, Object[] input2) throws Exception { + if (initialized == false) { + synchronized (SparkCubingByLayer.class) { + if (initialized == false) { + init(); + initialized = true; + } + } + } Object[] result = new Object[measureNum]; aggregators.aggregate(input1, input2, result, needAggr); return result; @@ -296,30 +386,41 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa private static final java.lang.Iterable<Tuple2<ByteArray, Object[]>> EMTPY_ITERATOR = new ArrayList(0); - class CuboidFlatMap implements PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> { - - String metaUrl; - CubeSegment cubeSegment; - CubeDesc cubeDesc; - CuboidScheduler cuboidScheduler; - NDCuboidBuilder ndCuboidBuilder; - RowKeySplitter rowKeySplitter; - transient boolean initialized = false; - - CuboidFlatMap(String metaUrl, CubeSegment cubeSegment, CubeDesc cubeDesc, CuboidScheduler cuboidScheduler, NDCuboidBuilder ndCuboidBuilder) { - this.metaUrl = metaUrl; - this.cubeSegment = cubeSegment; - this.cubeDesc = cubeDesc; - this.cuboidScheduler = cuboidScheduler; - this.ndCuboidBuilder = ndCuboidBuilder; + static class CuboidFlatMap implements PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> { + + private String cubeName; + private String segmentId; + private CubeSegment cubeSegment; + private CubeDesc cubeDesc; + private CuboidScheduler cuboidScheduler; + private NDCuboidBuilder ndCuboidBuilder; + private RowKeySplitter rowKeySplitter; + private volatile transient boolean initialized = false; + + public CuboidFlatMap(String cubeName, String segmentId) { + this.cubeName = cubeName; + this.segmentId = segmentId; + } + + public void init() { + KylinConfig kConfig = getKylinConfigForExecutor(); + CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName); + this.cubeSegment = cubeInstance.getSegmentById(segmentId); + this.cubeDesc = cubeInstance.getDescriptor(); + this.cuboidScheduler = new CuboidScheduler(cubeDesc); + this.ndCuboidBuilder = new NDCuboidBuilder(cubeSegment, new RowKeyEncoderProvider(cubeSegment)); this.rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256); } @Override public Iterable<Tuple2<ByteArray, Object[]>> call(Tuple2<ByteArray, Object[]> tuple2) throws Exception { if (initialized == false) { - loadKylinConfig(metaUrl); - initialized = true; + synchronized (SparkCubingByLayer.class) { + if (initialized == false) { + init(); + initialized = true; + } + } } byte[] key = tuple2._1().array(); @@ -336,7 +437,8 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa List<Tuple2<ByteArray, Object[]>> tuples = new ArrayList(myChildren.size()); for (Long child : myChildren) { Cuboid childCuboid = Cuboid.findById(cubeDesc, child); - Pair<Integer, ByteArray> result = ndCuboidBuilder.buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers()); + Pair<Integer, ByteArray> result = ndCuboidBuilder.buildKey(parentCuboid, childCuboid, + rowKeySplitter.getSplitBuffers()); byte[] newKey = new byte[result.getFirst()]; System.arraycopy(result.getSecond().array(), 0, newKey, 0, result.getFirst()); @@ -349,11 +451,14 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa } //sanity check - private void sanityCheck(JavaPairRDD<ByteArray, Object[]> rdd, Long totalCount, int thisLevel, CubeStatsReader cubeStatsReader, final int countMeasureIndex) { + private void sanityCheck(JavaPairRDD<ByteArray, Object[]> rdd, Long totalCount, int thisLevel, + CubeStatsReader cubeStatsReader, final int countMeasureIndex) { int thisCuboidNum = cubeStatsReader.getCuboidsByLayer(thisLevel).size(); Long count2 = getRDDCountSum(rdd, countMeasureIndex); if (count2 != totalCount * thisCuboidNum) { - throw new IllegalStateException(String.format("Sanity check failed, level %s, total count(*) is %s; cuboid number %s", thisLevel, count2, thisCuboidNum)); + throw new IllegalStateException( + String.format("Sanity check failed, level %s, total count(*) is %s; cuboid number %s", thisLevel, + count2, thisCuboidNum)); } else { logger.info("sanity check success for level " + thisLevel + ", count(*) is " + (count2 / thisCuboidNum)); } @@ -368,7 +473,8 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa } }).reduce(new Function2<Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>>() { @Override - public Tuple2<ByteArray, Long> call(Tuple2<ByteArray, Long> longTuple2, Tuple2<ByteArray, Long> longTuple22) throws Exception { + public Tuple2<ByteArray, Long> call(Tuple2<ByteArray, Long> longTuple2, Tuple2<ByteArray, Long> longTuple22) + throws Exception { return new Tuple2<>(ONE, longTuple2._2() + longTuple22._2()); } })._2(); @@ -380,5 +486,5 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa String path = metaUrl.substring(0, cut); HadoopUtil.getFileSystem(path).delete(new Path(path), true); logger.info("Delete metadata in HDFS for this job: " + path); - }; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/a1c234a9/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json b/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json index 28a63d5..27acdd3 100644 --- a/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json +++ b/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json @@ -610,7 +610,7 @@ "status_need_notify": [], "auto_merge_time_ranges": null, "retention_range": 0, - "engine_type": 2, + "engine_type": 4, "storage_type": 2, "override_kylin_properties": { "kylin.cube.algorithm": "LAYER" http://git-wip-us.apache.org/repos/asf/kylin/blob/a1c234a9/server-base/src/main/java/org/apache/kylin/rest/util/Log4jConfigListener.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/util/Log4jConfigListener.java b/server-base/src/main/java/org/apache/kylin/rest/util/Log4jConfigListener.java index 7e79511..3dde9cf 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/util/Log4jConfigListener.java +++ b/server-base/src/main/java/org/apache/kylin/rest/util/Log4jConfigListener.java @@ -35,6 +35,7 @@ public class Log4jConfigListener extends org.springframework.web.util.Log4jConfi if (!isDebugTomcat) { super.contextInitialized(event); } + System.setProperty("needCheckCC", "true"); } @Override