Repository: kylin Updated Branches: refs/heads/sparkcubing-rebase 5b7ef354e -> d94434095 (forced update)
KYLIN-2371 Allow overwrite default spark conf at cube level Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d9443409 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d9443409 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d9443409 Branch: refs/heads/sparkcubing-rebase Commit: d9443409542752e9d85dffccd95e54449e26b297 Parents: 1b04c3c Author: shaofengshi <shaofeng...@apache.org> Authored: Mon Jan 9 15:23:22 2017 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Mon Jan 9 15:35:34 2017 +0800 ---------------------------------------------------------------------- build/conf/kylin-spark-conf.properties | 5 +-- build/conf/kylin.properties | 6 ++-- .../apache/kylin/common/KylinConfigBase.java | 12 ++++--- .../kylin/job/constant/ExecutableConstants.java | 1 + .../spark/SparkBatchCubingJobBuilder2.java | 2 +- .../kylin/engine/spark/SparkCubingByLayer.java | 7 ++-- .../kylin/engine/spark/SparkExecutable.java | 38 +++++++++++++++++--- .../test_case_data/sandbox/kylin.properties | 3 +- 8 files changed, 55 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/d9443409/build/conf/kylin-spark-conf.properties ---------------------------------------------------------------------- diff --git a/build/conf/kylin-spark-conf.properties b/build/conf/kylin-spark-conf.properties index 5e6dafe..143e7e4 100644 --- a/build/conf/kylin-spark-conf.properties +++ b/build/conf/kylin-spark-conf.properties @@ -1,5 +1,5 @@ spark.yarn.submit.file.replication=1 -spark.yarn.executor.memoryOverhead=200 +spark.yarn.executor.memoryOverhead=1024 spark.yarn.driver.memoryOverhead=384 spark.master=yarn spark.submit.deployMode=cluster @@ -20,7 +20,8 @@ spark.executor.cores=4 spark.executor.instances=8 spark.history.kerberos.keytab=none spark.history.kerberos.principal=none -#spark.yarn.jar=hdfs://namenode:8020/apps/spark/spark-assembly-1.6.3-hadoop2.6.0.jar spark.driver.extraJavaOptions=-Dhdp.version=current spark.yarn.am.extraJavaOptions=-Dhdp.version=current spark.executor.extraJavaOptions=-Dhdp.version=current +#spark.yarn.jar=hdfs://namenode:8020/kylin/spark/spark-assembly-1.6.3-hadoop2.6.0.jar +#spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec http://git-wip-us.apache.org/repos/asf/kylin/blob/d9443409/build/conf/kylin.properties ---------------------------------------------------------------------- diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties index 54430f0..6efa423 100644 --- a/build/conf/kylin.properties +++ b/build/conf/kylin.properties @@ -131,16 +131,16 @@ kylin.engine.mr.mapper-input-rows=1000000 ### Spark Engine ### # Hadoop conf folder, will export this as "HADOOP_CONF_DIR" before run spark-submit -kylin.engine.spark.env.hadoop-conf-dir=/etc/hadoop/conf +#kylin.engine.spark.env.hadoop-conf-dir=/etc/hive/conf # Spark job submission properties file, default be $KYLIN_HOME/conf/kylin-spark-conf.properties #kylin.engine.spark.properties-file= # Estimate the RDD partition numbers -kylin.engine.spark.rdd-partition-cut-mb=50 +kylin.engine.spark.rdd-partition-cut-mb=10 # Minimal partition numbers of rdd -kylin.engine.spark.min-partition=10 +kylin.engine.spark.min-partition=1 # Max partition numbers of rdd kylin.engine.spark.max-partition=5000 http://git-wip-us.apache.org/repos/asf/kylin/blob/d9443409/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 7d6ac2b..36ddbf4 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -695,6 +695,10 @@ abstract public class KylinConfigBase implements Serializable { return getPropertiesByPrefix("kylin.engine.mr.config-override."); } + public Map<String, String> getSparkConfigOverride() { + return getPropertiesByPrefix("kylin.engine.spark.config-override."); + } + public double getDefaultHadoopJobReducerInputMB() { return Double.parseDouble(getOptional("kylin.engine.mr.reduce-input-mb", "500")); } @@ -739,8 +743,8 @@ abstract public class KylinConfigBase implements Serializable { // ENGINE.SPARK // ============================================================================ - public String getSparkHadoopConfDir() { - return getRequired("kylin.engine.spark.env.hadoop-conf-dir"); + public String getHadoopConfDir() { + return getOptional("kylin.engine.spark.env.hadoop-conf-dir", ""); } public String getSparkConfFile() { @@ -764,7 +768,7 @@ abstract public class KylinConfigBase implements Serializable { } public float getSparkRDDPartitionCutMB() { - return Float.valueOf(getOptional("kylin.engine.spark.rdd-partition-cut-mb", "200.0")); + return Float.valueOf(getOptional("kylin.engine.spark.rdd-partition-cut-mb", "10.0")); } @@ -773,7 +777,7 @@ abstract public class KylinConfigBase implements Serializable { } public int getSparkMaxPartition() { - return Integer.valueOf(getOptional("kylin.engine.spark.max-partition", "500")); + return Integer.valueOf(getOptional("kylin.engine.spark.max-partition", "5000")); } public boolean isSparkSanityCheckEnabled() { http://git-wip-us.apache.org/repos/asf/kylin/blob/d9443409/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index 11c7455..d7f6292 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -40,6 +40,7 @@ public final class ExecutableConstants { public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns"; public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid"; public static final String STEP_NAME_BUILD_IN_MEM_CUBE = "Build Cube In-Mem"; + public static final String STEP_NAME_BUILD_SPARK_CUBE = "Build Cube with Spark"; public static final String STEP_NAME_BUILD_N_D_CUBOID = "Build N-Dimension Cuboid"; public static final String STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION = "Calculate HTable Region Splits"; public static final String STEP_NAME_CREATE_HBASE_TABLE = "Create HTable"; http://git-wip-us.apache.org/repos/asf/kylin/blob/d9443409/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java index 55e11c4..208a0c9 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java @@ -70,7 +70,7 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 { StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars()); sparkExecutable.setJars(jars.toString()); - sparkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE + " with Spark"); + sparkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_SPARK_CUBE); return sparkExecutable; } http://git-wip-us.apache.org/repos/asf/kylin/blob/d9443409/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java index 93cce81..c989dee 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java @@ -144,7 +144,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa final String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH); final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH); - SparkConf conf = new SparkConf().setAppName("Cubing Application"); + SparkConf conf = new SparkConf().setAppName("Cubing for:" + cubeName + ", segment " + segmentId); //serialization conf conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.kryo.registrationRequired", "true"); @@ -249,7 +249,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa } final int totalLevels = cubeDesc.getBuildLevel(); - JavaPairRDD<ByteArray, Object[]>[] allRDDs = new JavaPairRDD[totalLevels]; + JavaPairRDD<ByteArray, Object[]>[] allRDDs = new JavaPairRDD[totalLevels + 1]; int level = 0; int partition = estimateRDDPartitionNum(level, cubeStatsReader, kylinConfig); @@ -285,6 +285,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa } private static void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, final CubeDesc cubeDesc, final String hdfsBaseLocation, int level, Configuration conf) { + conf.set("dfs.replication", "2"); final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level); rdd.mapToPair(new PairFunction<Tuple2<ByteArray, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() { BufferedMeasureCodec codec = new BufferedMeasureCodec(cubeDesc.getMeasures()); @@ -403,7 +404,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa Long count = rdd.mapValues(new Function<Object[], Long>() { @Override public Long call(Object[] objects) throws Exception { - return (Long) objects[countMeasureIndex]; // assume the first measure is COUNT(*) + return (Long) objects[countMeasureIndex]; } }).reduce(new Function2<Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>>() { @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/d9443409/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java index 644f73f..d892060 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java @@ -17,9 +17,10 @@ */ package org.apache.kylin.engine.spark; -import java.io.IOException; +import java.io.File; import java.util.Map; +import jodd.util.StringUtil; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.CliCommandExecutor; @@ -78,14 +79,43 @@ public class SparkExecutable extends AbstractExecutable { String sparkConf = config.getSparkConfFile(); String jars = this.getParam(JARS); - String jobJar = config.getKylinJobJarPath(); + String hadoopConf = "/etc/hadoop/conf"; + if (StringUtil.isNotEmpty(config.getHadoopConfDir())) { + hadoopConf = config.getHadoopConfDir(); + } else { + String hiveConf = ClassLoader.getSystemClassLoader().getResource("hive-site.xml").getFile().toString(); + File hiveConfFile = new File(hiveConf); + if (hiveConfFile.exists() == true) { + logger.info("Locate hive-site.xml in " + hiveConfFile); + hadoopConf = hiveConfFile.getParent(); + } + } + logger.info("Using " + hadoopConf + " as HADOOP_CONF_DIR"); + + String hbaseConf = ClassLoader.getSystemClassLoader().getResource("hbase-site.xml").getFile().toString(); + logger.info("Get hbase-site.xml location from classpath: " + hbaseConf); + File hbaseConfFile = new File(hbaseConf); + if (hbaseConfFile.exists() == false) { + throw new IllegalArgumentException("Couldn't find hbase-site.xml from classpath."); + } + String jobJar = config.getKylinJobJarPath(); if (StringUtils.isEmpty(jars)) { jars = jobJar; } + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry --properties-file %s "); + + Map<String, String> sparkConfs = config.getSparkConfigOverride(); + for (Map.Entry<String, String> entry : sparkConfs.entrySet()) { + stringBuilder.append(" --conf ").append(entry.getKey()).append("==").append(entry.getValue()).append(" "); + } + + stringBuilder.append("--files %s --jars %s %s %s"); try { - String cmd = String.format("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class \"org.apache.kylin.common.util.SparkEntry\" --properties-file %s --jars %s %s %s", config.getSparkHadoopConfDir(), config.getSparkHome(), sparkConf, jars, jobJar, formatArgs()); + String cmd = String.format(stringBuilder.toString(), + hadoopConf, config.getSparkHome(), sparkConf, hbaseConfFile.getAbsolutePath(), jars, jobJar, formatArgs()); logger.info("cmd:" + cmd); final StringBuilder output = new StringBuilder(); CliCommandExecutor exec = new CliCommandExecutor(); @@ -98,7 +128,7 @@ public class SparkExecutable extends AbstractExecutable { } }); return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString()); - } catch (IOException e) { + } catch (Exception e) { logger.error("error run spark job:", e); return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/d9443409/examples/test_case_data/sandbox/kylin.properties ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties index a011911..d42e009 100644 --- a/examples/test_case_data/sandbox/kylin.properties +++ b/examples/test_case_data/sandbox/kylin.properties @@ -156,10 +156,9 @@ kylin.server.query-metrics-percentiles-intervals=60, 360, 3600 # Env DEV|QA|PROD kylin.env=DEV -kylin.source.hive.keep-flat-table=true +kylin.source.hive.keep-flat-table=false ### Spark as Engine ### -#kylin.engine.spark.env.hadoop-conf-dir=/etc/hadoop/conf kylin.engine.spark.env.hadoop-conf-dir=../examples/test_case_data/sandbox kylin.engine.spark.spark-home=/usr/local/spark kylin.engine.spark.properties-file=../examples/test_case_data/sandbox/kylin-spark-conf.properties