Repository: kylin Updated Branches: refs/heads/master ee74a74e4 -> 00c8f31e6
KYLIN-2371 rename the prefix to âkylin.engine.spark-conf.â Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/00c8f31e Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/00c8f31e Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/00c8f31e Branch: refs/heads/master Commit: 00c8f31e6d235d38bb7919fa4ee5b12e53d840f6 Parents: ee74a74 Author: shaofengshi <shaofeng...@apache.org> Authored: Tue Jan 10 09:40:44 2017 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Tue Jan 10 09:40:44 2017 +0800 ---------------------------------------------------------------------- build/conf/kylin-spark-conf.properties | 27 ----------- build/conf/kylin.properties | 48 ++++++++++++-------- build/deploy/spark-defaults.conf | 6 +++ build/script/download-spark.sh | 3 +- .../apache/kylin/common/KylinConfigBase.java | 18 +------- .../kylin/engine/mr/BatchCubingJobBuilder2.java | 2 +- .../kylin/engine/spark/SparkCubingByLayer.java | 10 ++-- .../kylin/engine/spark/SparkExecutable.java | 7 ++- .../test_case_data/sandbox/kylin.properties | 19 ++++++-- 9 files changed, 65 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/00c8f31e/build/conf/kylin-spark-conf.properties ---------------------------------------------------------------------- diff --git a/build/conf/kylin-spark-conf.properties b/build/conf/kylin-spark-conf.properties deleted file mode 100644 index 143e7e4..0000000 --- a/build/conf/kylin-spark-conf.properties +++ /dev/null @@ -1,27 +0,0 @@ -spark.yarn.submit.file.replication=1 -spark.yarn.executor.memoryOverhead=1024 -spark.yarn.driver.memoryOverhead=384 -spark.master=yarn -spark.submit.deployMode=cluster -spark.eventLog.enabled=true -spark.yarn.scheduler.heartbeat.interval-ms=5000 -spark.yarn.preserve.staging.files=true -spark.yarn.queue=default -spark.yarn.containerLauncherMaxThreads=25 -spark.yarn.max.executor.failures=3 -spark.eventLog.dir=hdfs\:///kylin/spark-history -spark.history.kerberos.enabled=true -spark.history.provider=org.apache.spark.deploy.history.FsHistoryProvider -spark.history.ui.port=18080 -spark.history.fs.logDirectory=hdfs\:///kylin/spark-history -spark.executor.memory=4G -spark.storage.memoryFraction=0.3 -spark.executor.cores=4 -spark.executor.instances=8 -spark.history.kerberos.keytab=none -spark.history.kerberos.principal=none -spark.driver.extraJavaOptions=-Dhdp.version=current -spark.yarn.am.extraJavaOptions=-Dhdp.version=current -spark.executor.extraJavaOptions=-Dhdp.version=current -#spark.yarn.jar=hdfs://namenode:8020/kylin/spark/spark-assembly-1.6.3-hadoop2.6.0.jar -#spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec http://git-wip-us.apache.org/repos/asf/kylin/blob/00c8f31e/build/conf/kylin.properties ---------------------------------------------------------------------- diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties index 6efa423..196a711 100644 --- a/build/conf/kylin.properties +++ b/build/conf/kylin.properties @@ -127,24 +127,6 @@ kylin.engine.mr.max-reducer-number=500 kylin.engine.mr.mapper-input-rows=1000000 - -### Spark Engine ### - -# Hadoop conf folder, will export this as "HADOOP_CONF_DIR" before run spark-submit -#kylin.engine.spark.env.hadoop-conf-dir=/etc/hive/conf - -# Spark job submission properties file, default be $KYLIN_HOME/conf/kylin-spark-conf.properties -#kylin.engine.spark.properties-file= - -# Estimate the RDD partition numbers -kylin.engine.spark.rdd-partition-cut-mb=10 - -# Minimal partition numbers of rdd -kylin.engine.spark.min-partition=1 - -# Max partition numbers of rdd -kylin.engine.spark.max-partition=5000 - ### CUBE | DICTIONARY ### # 'auto', 'inmem', 'layer' or 'random' for testing @@ -204,3 +186,33 @@ kylin.security.saml.context-server-name=hostname kylin.security.saml.context-server-port=443 kylin.security.saml.context-path=/kylin + +### Spark Engine Configs ### + +# Hadoop conf folder, will export this as "HADOOP_CONF_DIR" before run spark-submit +#kylin.engine.spark.env.hadoop-conf-dir=/etc/hive/conf + +# Estimate the RDD partition numbers +kylin.engine.spark.rdd-partition-cut-mb=10 + +# Minimal partition numbers of rdd +kylin.engine.spark.min-partition=1 + +# Max partition numbers of rdd +kylin.engine.spark.max-partition=5000 + +### Spark conf (default is in spark/conf/spark-defaults.conf) +kylin.engine.spark-conf.spark.master=yarn +kylin.engine.spark-conf.spark.submit.deployMode=cluster +kylin.engine.spark-conf.spark.yarn.executor.memoryOverhead=1024 +kylin.engine.spark-conf.spark.yarn.driver.memoryOverhead=384 +kylin.engine.spark-conf.spark.yarn.queue=default +kylin.engine.spark-conf.spark.executor.memory=4G +kylin.engine.spark-conf.spark.executor.cores=4 +kylin.engine.spark-conf.spark.executor.instances=8 +kylin.engine.spark-conf.spark.storage.memoryFraction=0.3 +kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history +kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history +## manually upload spark-assembly jar to HDFS and then set this property will avoid repeatedly uploading jar at runtime +#kylin.engine.spark-conf.spark.yarn.jar=hdfs://namenode:8020/kylin/spark/spark-assembly-1.6.3-hadoop2.6.0.jar +#kylin.engine.spark-conf.spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/00c8f31e/build/deploy/spark-defaults.conf ---------------------------------------------------------------------- diff --git a/build/deploy/spark-defaults.conf b/build/deploy/spark-defaults.conf new file mode 100644 index 0000000..36c0ab3 --- /dev/null +++ b/build/deploy/spark-defaults.conf @@ -0,0 +1,6 @@ +spark.yarn.submit.file.replication=1 +spark.eventLog.enabled=true +spark.yarn.max.executor.failures=3 +spark.driver.extraJavaOptions=-Dhdp.version=current +spark.yarn.am.extraJavaOptions=-Dhdp.version=current +spark.executor.extraJavaOptions=-Dhdp.version=current http://git-wip-us.apache.org/repos/asf/kylin/blob/00c8f31e/build/script/download-spark.sh ---------------------------------------------------------------------- diff --git a/build/script/download-spark.sh b/build/script/download-spark.sh index dcbcbe7..ad9651d 100755 --- a/build/script/download-spark.sh +++ b/build/script/download-spark.sh @@ -48,5 +48,6 @@ mv build/spark-${spark_version}-bin-hadoop2.6 build/spark rm -rf build/spark/lib/spark-examples-* rm -rf build/spark/examples rm -rf build/spark/data -rm -rf build/spark/python rm -rf build/spark/R + +cp build/deploy/spark-defaults.conf build/spark/conf/spark-defaults.conf \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/00c8f31e/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 36ddbf4..04051b4 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -696,7 +696,7 @@ abstract public class KylinConfigBase implements Serializable { } public Map<String, String> getSparkConfigOverride() { - return getPropertiesByPrefix("kylin.engine.spark.config-override."); + return getPropertiesByPrefix("kylin.engine.spark-conf."); } public double getDefaultHadoopJobReducerInputMB() { @@ -747,22 +747,6 @@ abstract public class KylinConfigBase implements Serializable { return getOptional("kylin.engine.spark.env.hadoop-conf-dir", ""); } - public String getSparkConfFile() { - String conf = getOptional("kylin.engine.spark.properties-file", "conf/kylin-spark-conf.properties"); - File f = new File(conf); - if (f.exists()) { - return f.getAbsolutePath(); - } else { - String home = getKylinHome(); - f = new File(home, conf); - if (f.exists()) { - return f.getAbsolutePath(); - } - } - - throw new IllegalArgumentException("Spark conf properties file '" + conf + "' does not exist."); - } - public String getSparkAdditionalJars() { return getOptional("kylin.engine.spark.additional-jars", ""); } http://git-wip-us.apache.org/repos/asf/kylin/blob/00c8f31e/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java index dd866bd..0f604e2 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java @@ -82,7 +82,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { result.addTask(createBaseCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, 0), jobId)); // n dim cuboid steps for (int i = 1; i <= maxLevel; i++) { - result.addTask(createNDimensionCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, i-1), getCuboidOutputPathsByLevel(cuboidRootPath, i-1), i, jobId)); + result.addTask(createNDimensionCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, i-1), getCuboidOutputPathsByLevel(cuboidRootPath, i), i, jobId)); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/00c8f31e/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java index c989dee..3a664fc 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java @@ -255,7 +255,10 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa // aggregate to calculate base cuboid allRDDs[0] = encodedBaseRDD.reduceByKey(baseCuboidReducerFunction, partition).persist(storageLevel); - saveToHDFS(allRDDs[0], vCubeDesc.getValue(), outputPath, 0, sc.hadoopConfiguration()); + Configuration confOverwrite = new Configuration(sc.hadoopConfiguration()); + confOverwrite.set("dfs.replication", "2"); // cuboid intermediate files, replication=2 + + saveToHDFS(allRDDs[0], vCubeDesc.getValue(), outputPath, 0, confOverwrite); // aggregate to ND cuboids PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> flatMapFunction = new CuboidFlatMap(vCubeSegment.getValue(), vCubeDesc.getValue(), vCuboidScheduler.getValue(), ndCuboidBuilder); @@ -267,7 +270,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa if (kylinConfig.isSparkSanityCheckEnabled() == true) { sanityCheck(allRDDs[level], totalCount, level, cubeStatsReader, countMeasureIndex); } - saveToHDFS(allRDDs[level], vCubeDesc.getValue(), outputPath, level, sc.hadoopConfiguration()); + saveToHDFS(allRDDs[level], vCubeDesc.getValue(), outputPath, level, confOverwrite); allRDDs[level - 1].unpersist(); } allRDDs[totalLevels - 1].unpersist(); @@ -285,8 +288,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa } private static void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, final CubeDesc cubeDesc, final String hdfsBaseLocation, int level, Configuration conf) { - conf.set("dfs.replication", "2"); - final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level); + final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level); rdd.mapToPair(new PairFunction<Tuple2<ByteArray, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() { BufferedMeasureCodec codec = new BufferedMeasureCodec(cubeDesc.getMeasures()); @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/00c8f31e/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java index d892060..733a472 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java @@ -76,7 +76,6 @@ public class SparkExecutable extends AbstractExecutable { final KylinConfig config = context.getConfig(); Preconditions.checkNotNull(config.getSparkHome()); Preconditions.checkNotNull(config.getKylinJobJarPath()); - String sparkConf = config.getSparkConfFile(); String jars = this.getParam(JARS); String hadoopConf = "/etc/hadoop/conf"; @@ -105,17 +104,17 @@ public class SparkExecutable extends AbstractExecutable { } StringBuilder stringBuilder = new StringBuilder(); - stringBuilder.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry --properties-file %s "); + stringBuilder.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry "); Map<String, String> sparkConfs = config.getSparkConfigOverride(); for (Map.Entry<String, String> entry : sparkConfs.entrySet()) { - stringBuilder.append(" --conf ").append(entry.getKey()).append("==").append(entry.getValue()).append(" "); + stringBuilder.append(" --conf ").append(entry.getKey()).append("=").append(entry.getValue()).append(" "); } stringBuilder.append("--files %s --jars %s %s %s"); try { String cmd = String.format(stringBuilder.toString(), - hadoopConf, config.getSparkHome(), sparkConf, hbaseConfFile.getAbsolutePath(), jars, jobJar, formatArgs()); + hadoopConf, config.getSparkHome(), hbaseConfFile.getAbsolutePath(), jars, jobJar, formatArgs()); logger.info("cmd:" + cmd); final StringBuilder output = new StringBuilder(); CliCommandExecutor exec = new CliCommandExecutor(); http://git-wip-us.apache.org/repos/asf/kylin/blob/00c8f31e/examples/test_case_data/sandbox/kylin.properties ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties index d42e009..06f8e4b 100644 --- a/examples/test_case_data/sandbox/kylin.properties +++ b/examples/test_case_data/sandbox/kylin.properties @@ -160,6 +160,19 @@ kylin.source.hive.keep-flat-table=false ### Spark as Engine ### kylin.engine.spark.env.hadoop-conf-dir=../examples/test_case_data/sandbox -kylin.engine.spark.spark-home=/usr/local/spark -kylin.engine.spark.properties-file=../examples/test_case_data/sandbox/kylin-spark-conf.properties -kylin.engine.spark.sanity-check-enabled=false \ No newline at end of file +kylin.engine.spark.sanity-check-enabled=false + +### Spark conf overwrite for cube engine +kylin.engine.spark-conf.spark.master=yarn +kylin.engine.spark-conf.spark.submit.deployMode=client +kylin.engine.spark-conf.spark.yarn.executor.memoryOverhead=512 +kylin.engine.spark-conf.spark.yarn.driver.memoryOverhead=384 +kylin.engine.spark-conf.spark.executor.memory=1G +kylin.engine.spark-conf.spark.executor.cores=1 +kylin.engine.spark-conf.spark.executor.instances=1 +kylin.engine.spark-conf.spark.storage.memoryFraction=0.3 +kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history +kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history +#kylin.engine.spark-conf.spark.yarn.queue=default +#kylin.engine.spark-conf.spark.yarn.jar=hdfs://sandbox.hortonworks.com:8020/kylin/spark/spark-assembly-1.6.3-hadoop2.6.0.jar +#kylin.engine.spark-conf.spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec