This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch KYLIN-3137 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit f11aec308119426222a4bc9d9d6a28f551a16111 Author: shaofengshi <shaofeng...@apache.org> AuthorDate: Thu May 31 11:17:49 2018 +0800 KYLIN-3137 Spark cubing refine --- .../kylin/engine/mr/common/BaseCuboidBuilder.java | 1 + .../engine/spark/SparkBatchCubingJobBuilder2.java | 11 +++-- .../kylin/engine/spark/SparkCubingByLayer.java | 49 +++++++++------------- .../apache/kylin/engine/spark/SparkExecutable.java | 5 --- 4 files changed, 26 insertions(+), 40 deletions(-) diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java index 40f1ac5..7cc7779 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java @@ -47,6 +47,7 @@ public class BaseCuboidBuilder implements java.io.Serializable { protected static final Logger logger = LoggerFactory.getLogger(BaseCuboidBuilder.class); public static final String HIVE_NULL = "\\N"; + public static final String SEQUENCEFILE_DELIMITER = "\\01"; protected String cubeName; protected Cuboid baseCuboid; protected CubeDesc cubeDesc; diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java index 57d4fb0..91690dd 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java @@ -26,6 +26,7 @@ import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.mr.BatchCubingJobBuilder2; import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.job.JoinedFlatTable; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.slf4j.Logger; @@ -69,11 +70,12 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 { public void configureSparkJob(final CubeSegment seg, final SparkExecutable sparkExecutable, final String jobId, final String cuboidRootPath) { - IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg); + final IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg); + final String tablePath = JoinedFlatTable.getTableDir(flatTableDesc, getJobWorkingDir(jobId)); sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName()); sparkExecutable.setParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt(), seg.getUuid()); - sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(), - seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatTableDesc.getTableName()); + sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_PATH.getOpt(), + tablePath); sparkExecutable.setParam(SparkCubingByLayer.OPTION_META_URL.getOpt(), getSegmentMetadataUrl(seg.getConfig(), jobId)); sparkExecutable.setParam(SparkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(), cuboidRootPath); @@ -81,9 +83,6 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 { StringBuilder jars = new StringBuilder(); - StringUtil.appendWithSeparator(jars, findJar("com.yammer.metrics.core.Gauge", null)); // metrics-core.jar - StringUtil.appendWithSeparator(jars, findJar("com.google.common.collect.Maps", "guava")); //guava.jar - StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars()); sparkExecutable.setJars(jars.toString()); sparkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_SPARK_CUBE); diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java index 76e7e22..e6d478e 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java @@ -30,12 +30,14 @@ import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.StorageURL; import org.apache.kylin.common.util.AbstractApplication; import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.OptionsHelper; import org.apache.kylin.common.util.Pair; @@ -71,15 +73,14 @@ import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.hive.HiveContext; import org.apache.spark.storage.StorageLevel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Tuple2; +import static org.apache.kylin.engine.mr.common.BaseCuboidBuilder.SEQUENCEFILE_DELIMITER; + /** * Spark application to build cube with the "by-layer" algorithm. Only support source data from Hive; Metadata in HBase. */ @@ -95,14 +96,14 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa .withDescription("HDFS metadata url").create("metaUrl"); public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg() .isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT); - public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true) - .withDescription("Hive Intermediate Table").create("hiveTable"); + public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg().isRequired(true) + .withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT); private Options options; public SparkCubingByLayer() { options = new Options(); - options.addOption(OPTION_INPUT_TABLE); + options.addOption(OPTION_INPUT_PATH); options.addOption(OPTION_CUBE_NAME); options.addOption(OPTION_SEGMENT_ID); options.addOption(OPTION_META_URL); @@ -117,7 +118,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa @Override protected void execute(OptionsHelper optionsHelper) throws Exception { String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL); - String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE); + String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH); String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME); String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID); String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH); @@ -145,6 +146,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa confOverwrite.set("dfs.replication", "2"); // cuboid intermediate files, replication=2 final Job job = Job.getInstance(confOverwrite); + logger.info("RDD input path: {}", inputPath); logger.info("RDD Output path: {}", outputPath); setHadoopConf(job, cubeSegment, metaUrl); @@ -166,12 +168,15 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa logger.info("All measure are normal (agg on all cuboids) ? : " + allNormalMeasure); StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK_SER(); - HiveContext sqlContext = new HiveContext(sc.sc()); - final Dataset intermediateTable = sqlContext.table(hiveTable); - - // encode with dimension encoding, transform to <ByteArray, Object[]> RDD - final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = intermediateTable.javaRDD() - .mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf)); + final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = sc + .sequenceFile(inputPath, BytesWritable.class, Text.class).values() + .map(new Function<Text, String[]>() { + @Override + public String[] call(Text text) throws Exception { + String s = Bytes.toString(text.getBytes(), 0, text.getLength()); + return s.split(SEQUENCEFILE_DELIMITER); + } + }).mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf)); Long totalCount = 0L; if (envConfig.isSparkSanityCheckEnabled()) { @@ -269,7 +274,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa logger.info("Persisting RDD for level " + level + " into " + cuboidOutputPath); } - static public class EncodeBaseCuboid implements PairFunction<Row, ByteArray, Object[]> { + static public class EncodeBaseCuboid implements PairFunction<String[], ByteArray, Object[]> { private volatile transient boolean initialized = false; private BaseCuboidBuilder baseCuboidBuilder = null; private String cubeName; @@ -285,7 +290,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa } @Override - public Tuple2<ByteArray, Object[]> call(Row row) throws Exception { + public Tuple2<ByteArray, Object[]> call(String[] rowArray) throws Exception { if (initialized == false) { synchronized (SparkCubingByLayer.class) { if (initialized == false) { @@ -304,25 +309,11 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa } } } - String[] rowArray = rowToArray(row); baseCuboidBuilder.resetAggrs(); byte[] rowKey = baseCuboidBuilder.buildKey(rowArray); Object[] result = baseCuboidBuilder.buildValueObjects(rowArray); return new Tuple2<>(new ByteArray(rowKey), result); } - - private String[] rowToArray(Row row) { - String[] result = new String[row.size()]; - for (int i = 0; i < row.size(); i++) { - final Object o = row.get(i); - if (o != null) { - result[i] = o.toString(); - } else { - result[i] = null; - } - } - return result; - } } static public class BaseCuboidReducerFunction2 implements Function2<Object[], Object[], Object[]> { diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java index 69232ba..8de78c0 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java @@ -112,11 +112,6 @@ public class SparkExecutable extends AbstractExecutable { "kylin_hadoop_conf_dir is empty, check if there's error in the output of 'kylin.sh start'"); } - File hiveConfFile = new File(hadoopConf, "hive-site.xml"); - if (!hiveConfFile.exists()) { - throw new RuntimeException("Cannot find hive-site.xml in kylin_hadoop_conf_dir: " + hadoopConf + // - ". In order to enable spark cubing, you must set kylin.env.hadoop-conf-dir to a dir which contains at least core-site.xml, hdfs-site.xml, hive-site.xml, mapred-site.xml, yarn-site.xml"); - } logger.info("Using " + hadoopConf + " as HADOOP_CONF_DIR"); String jobJar = config.getKylinJobJarPath(); -- To stop receiving notification emails like this one, please contact shaofeng...@apache.org.