Repository: kylin Updated Branches: refs/heads/KYLIN-2653 edbe7a52f -> 276fa2c4d (forced update)
KYLIN-2653 Manage the metadata use HDFSResourceStore for Spark Cubing Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d8d0395a Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d8d0395a Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d8d0395a Branch: refs/heads/KYLIN-2653 Commit: d8d0395a80cc50fcb59bab4d402c7675aef6cd22 Parents: 1a3527c Author: kangkaisen <kangkai...@live.com> Authored: Thu May 25 14:43:02 2017 +0800 Committer: kangkaisen <kangkai...@meituan.com> Committed: Sat Jul 22 14:59:47 2017 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/common/KylinConfig.java | 35 ++++++++- .../org/apache/kylin/common/KylinConfigExt.java | 2 +- .../engine/mr/common/AbstractHadoopJob.java | 45 +---------- .../engine/mr/common/JobRelatedMetaUtil.java | 71 +++++++++++++++++ .../spark/SparkBatchCubingJobBuilder2.java | 6 +- .../kylin/engine/spark/SparkCubingByLayer.java | 82 ++++++-------------- .../kylin/engine/spark/SparkExecutable.java | 58 +++++++++++--- 7 files changed, 188 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/d8d0395a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java index cc08056..a56e9b8 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java @@ -36,8 +36,11 @@ import java.util.Properties; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.kylin.common.restclient.RestClient; import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.OrderedProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,10 +117,13 @@ public class KylinConfig extends KylinConfigBase { } public enum UriType { - PROPERTIES_FILE, REST_ADDR, LOCAL_FOLDER + PROPERTIES_FILE, REST_ADDR, LOCAL_FOLDER, HDFS_FILE } private static UriType decideUriType(String metaUri) { + if (metaUri.indexOf("@hdfs") > 0) { + return UriType.HDFS_FILE; + } try { File file = new File(metaUri); @@ -157,6 +163,23 @@ public class KylinConfig extends KylinConfigBase { */ UriType uriType = decideUriType(uri); + if (uriType == UriType.HDFS_FILE) { + KylinConfig config; + FileSystem fs; + int cut = uri.indexOf('@'); + String realHdfsPath = uri.substring(0, cut) + "/" + KYLIN_CONF_PROPERTIES_FILE; + try { + config = new KylinConfig(); + fs = HadoopUtil.getFileSystem(realHdfsPath); + InputStream is = fs.open(new Path(realHdfsPath)); + Properties prop = streamToProps(is); + config.reloadKylinConfig(prop); + } catch (IOException e) { + throw new RuntimeException(e); + } + return config; + } + if (uriType == UriType.LOCAL_FOLDER) { KylinConfig config = new KylinConfig(); config.setMetadataUrl(uri); @@ -402,6 +425,16 @@ public class KylinConfig extends KylinConfigBase { super(props, force); } + public void writeProperties(Properties props, File file) throws IOException { + FileOutputStream fos = null; + try { + fos = new FileOutputStream(file); + props.store(fos, file.getAbsolutePath()); + } finally { + IOUtils.closeQuietly(fos); + } + } + public void writeProperties(File file) throws IOException { FileOutputStream fos = null; try { http://git-wip-us.apache.org/repos/asf/kylin/blob/d8d0395a/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java index d49dee7..786f467 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java @@ -61,7 +61,7 @@ public class KylinConfigExt extends KylinConfig { return super.getOptional(prop, dft); } - protected Properties getAllProperties() { + public Properties getAllProperties() { Properties result = new Properties(); result.putAll(super.getAllProperties()); result.putAll(overrides); http://git-wip-us.apache.org/repos/asf/kylin/blob/d8d0395a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index f9d9808..fc8fb4e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -56,8 +56,6 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.RawResource; -import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.util.CliCommandExecutor; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.OptionsHelper; @@ -68,8 +66,6 @@ import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.job.JobInstance; import org.apache.kylin.job.exception.JobException; import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.metadata.model.TableRef; -import org.apache.kylin.source.SourceFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -447,12 +443,12 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { } protected void attachCubeMetadata(CubeInstance cube, Configuration conf) throws IOException { - dumpKylinPropsAndMetadata(collectCubeMetadata(cube), cube.getConfig(), conf); + dumpKylinPropsAndMetadata(JobRelatedMetaUtil.collectCubeMetadata(cube), cube.getConfig(), conf); } protected void attachCubeMetadataWithDict(CubeInstance cube, Configuration conf) throws IOException { Set<String> dumpList = new LinkedHashSet<>(); - dumpList.addAll(collectCubeMetadata(cube)); + dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(cube)); for (CubeSegment segment : cube.getSegments()) { dumpList.addAll(segment.getDictionaryPaths()); } @@ -461,27 +457,11 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { protected void attachSegmentMetadataWithDict(CubeSegment segment, Configuration conf) throws IOException { Set<String> dumpList = new LinkedHashSet<>(); - dumpList.addAll(collectCubeMetadata(segment.getCubeInstance())); + dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segment.getCubeInstance())); dumpList.addAll(segment.getDictionaryPaths()); dumpKylinPropsAndMetadata(dumpList, segment.getConfig(), conf); } - private Set<String> collectCubeMetadata(CubeInstance cube) { - // cube, model_desc, cube_desc, table - Set<String> dumpList = new LinkedHashSet<>(); - dumpList.add(cube.getResourcePath()); - dumpList.add(cube.getDescriptor().getModel().getResourcePath()); - dumpList.add(cube.getDescriptor().getResourcePath()); - - for (TableRef tableRef : cube.getDescriptor().getModel().getAllTables()) { - TableDesc table = tableRef.getTableDesc(); - dumpList.add(table.getResourcePath()); - dumpList.addAll(SourceFactory.getMRDependentResources(table)); - } - - return dumpList; - } - protected void dumpKylinPropsAndMetadata(Set<String> dumpList, KylinConfig kylinConfig, Configuration conf) throws IOException { File tmp = File.createTempFile("kylin_job_meta", ""); FileUtils.forceDelete(tmp); // we need a directory, so delete the file first @@ -494,7 +474,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { kylinConfig.writeProperties(kylinPropsFile); // write resources - dumpResources(kylinConfig, metaDir, dumpList); + JobRelatedMetaUtil.dumpResources(kylinConfig, metaDir, dumpList); // hadoop distributed cache String hdfsMetaDir = OptionsHelper.convertToFileURL(metaDir.getAbsolutePath()); @@ -530,23 +510,6 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { } } - private void dumpResources(KylinConfig kylinConfig, File metaDir, Set<String> dumpList) throws IOException { - long startTime = System.currentTimeMillis(); - - ResourceStore from = ResourceStore.getStore(kylinConfig); - KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()); - ResourceStore to = ResourceStore.getStore(localConfig); - for (String path : dumpList) { - RawResource res = from.getResource(path); - if (res == null) - throw new IllegalStateException("No resource found at -- " + path); - to.putResource(path, res.inputStream, res.timestamp); - res.inputStream.close(); - } - - logger.debug("Dump resources to {} took {} ms", metaDir, System.currentTimeMillis() - startTime); - } - protected void deletePath(Configuration conf, Path path) throws IOException { HadoopUtil.deletePath(conf, path); } http://git-wip-us.apache.org/repos/asf/kylin/blob/d8d0395a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java new file mode 100644 index 0000000..46b1d3c --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.common; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.RawResource; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableRef; +import org.apache.kylin.source.SourceFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.LinkedHashSet; +import java.util.Set; + +public class JobRelatedMetaUtil { + private static final Logger logger = LoggerFactory.getLogger(JobRelatedMetaUtil.class); + + public static Set<String> collectCubeMetadata(CubeInstance cube) { + // cube, model_desc, cube_desc, table + Set<String> dumpList = new LinkedHashSet<>(); + dumpList.add(cube.getResourcePath()); + dumpList.add(cube.getDescriptor().getModel().getResourcePath()); + dumpList.add(cube.getDescriptor().getResourcePath()); + + for (TableRef tableRef : cube.getDescriptor().getModel().getAllTables()) { + TableDesc table = tableRef.getTableDesc(); + dumpList.add(table.getResourcePath()); + dumpList.addAll(SourceFactory.getMRDependentResources(table)); + } + + return dumpList; + } + + public static void dumpResources(KylinConfig kylinConfig, File metaDir, Set<String> dumpList) throws IOException { + long startTime = System.currentTimeMillis(); + + ResourceStore from = ResourceStore.getStore(kylinConfig); + KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()); + ResourceStore to = ResourceStore.getStore(localConfig); + for (String path : dumpList) { + RawResource res = from.getResource(path); + if (res == null) + throw new IllegalStateException("No resource found at -- " + path); + to.putResource(path, res.inputStream, res.timestamp); + res.inputStream.close(); + } + + logger.debug("Dump resources to {} took {} ms", metaDir, System.currentTimeMillis() - startTime); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/d8d0395a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java index 66b154d..07bc334 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java @@ -48,7 +48,7 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 { sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName()); sparkExecutable.setParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt(), seg.getUuid()); sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(), seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatTableDesc.getTableName()); - sparkExecutable.setParam(SparkCubingByLayer.OPTION_CONF_PATH.getOpt(), KylinConfig.getKylinConfPath()); + sparkExecutable.setParam(SparkCubingByLayer.OPTION_META_URL.getOpt(), getSegmentMetadataUrl(seg.getConfig(), seg.getUuid())); sparkExecutable.setParam(SparkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(), cuboidRootPath); StringBuilder jars = new StringBuilder(); @@ -84,4 +84,8 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 { return ""; } + private String getSegmentMetadataUrl(KylinConfig kylinConfig, String segmentID) { + return kylinConfig.getHdfsWorkingDirectory() + "metadata/" + segmentID + "@hdfs"; + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/d8d0395a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java index 91aa9f7..a8e7378 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java @@ -26,7 +26,6 @@ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.AbstractApplication; import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.OptionsHelper; import org.apache.kylin.common.util.Pair; @@ -51,7 +50,6 @@ import org.apache.kylin.measure.MeasureAggregators; import org.apache.kylin.measure.MeasureIngester; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.spark.SparkConf; -import org.apache.spark.SparkFiles; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; @@ -67,15 +65,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Tuple2; -import java.io.File; -import java.io.FileFilter; +import java.io.IOException; import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.List; - /** * Spark application to build cube with the "by-layer" algorithm. Only support source data from Hive; Metadata in HBase. */ @@ -85,7 +81,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME); public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube Segment Id").create("segmentId"); - public static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true).withDescription("Configuration Path").create("confPath"); + public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true).withDescription("HDFS metadata url").create("metaUrl"); public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg().isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT); public static final Option OPTION_INPUT_TABLE = OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable"); @@ -96,7 +92,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa options.addOption(OPTION_INPUT_TABLE); options.addOption(OPTION_CUBE_NAME); options.addOption(OPTION_SEGMENT_ID); - options.addOption(OPTION_CONF_PATH); + options.addOption(OPTION_META_URL); options.addOption(OPTION_OUTPUT_PATH); } @@ -105,32 +101,10 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa return options; } - private void setupClasspath(JavaSparkContext sc, String confPath) throws Exception { - ClassUtil.addClasspath(confPath); - final File[] files = new File(confPath).listFiles(new FileFilter() { - @Override - public boolean accept(File pathname) { - if (pathname.getAbsolutePath().endsWith(".xml")) { - return true; - } - if (pathname.getAbsolutePath().endsWith(".properties")) { - return true; - } - return false; - } - }); - for (File file : files) { - sc.addFile(file.getAbsolutePath()); - } - } - - private static final void prepare() { - File file = new File(SparkFiles.get("kylin.properties")); - String confPath = file.getParentFile().getAbsolutePath(); - logger.info("conf directory:" + confPath); - System.setProperty(KylinConfig.KYLIN_CONF, confPath); - ClassUtil.addClasspath(confPath); - + public static KylinConfig loadKylinConfig(String metaUrl) throws IOException { + KylinConfig kylinConfig = KylinConfig.createInstanceFromUri(metaUrl); + KylinConfig.setKylinConfigThreadLocal(kylinConfig); + return kylinConfig; } @Override @@ -138,7 +112,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa final String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE); final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME); final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID); - final String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH); + final String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL); final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH); SparkConf conf = new SparkConf().setAppName("Cubing for:" + cubeName + " segment " + segmentId); @@ -146,30 +120,22 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator"); conf.set("spark.kryo.registrationRequired", "true"); - JavaSparkContext sc = new JavaSparkContext(conf); - setupClasspath(sc, confPath); HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath)); - - System.setProperty(KylinConfig.KYLIN_CONF, confPath); - final KylinConfig envConfig = KylinConfig.getInstanceFromEnv(); + final KylinConfig kylinConfig = loadKylinConfig(metaUrl); HiveContext sqlContext = new HiveContext(sc.sc()); final DataFrame intermediateTable = sqlContext.table(hiveTable); - - final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName); + final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName); final CubeDesc cubeDesc = cubeInstance.getDescriptor(); final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); final CubeJoinedFlatTableEnrich intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc); - final KylinConfig kylinConfig = cubeDesc.getConfig(); final Broadcast<CubeDesc> vCubeDesc = sc.broadcast(cubeDesc); final Broadcast<CubeSegment> vCubeSegment = sc.broadcast(cubeSegment); final NDCuboidBuilder ndCuboidBuilder = new NDCuboidBuilder(vCubeSegment.getValue(), new RowKeyEncoderProvider(vCubeSegment.getValue())); - final Broadcast<CuboidScheduler> vCuboidScheduler = sc.broadcast(new CuboidScheduler(vCubeDesc.getValue())); final int measureNum = cubeDesc.getMeasures().size(); - int countMeasureIndex = 0; for (MeasureDesc measureDesc : cubeDesc.getMeasures()) { if (measureDesc.getFunction().isCount() == true) { @@ -186,9 +152,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa allNormalMeasure = allNormalMeasure && needAggr[i]; } logger.info("All measure are normal (agg on all cuboids) ? : " + allNormalMeasure); - StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK_SER(); - // encode with dimension encoding, transform to <ByteArray, Object[]> RDD final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = intermediateTable.javaRDD().mapToPair(new PairFunction<Row, ByteArray, Object[]>() { volatile transient boolean initialized = false; @@ -199,7 +163,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa if (initialized == false) { synchronized (SparkCubingByLayer.class) { if (initialized == false) { - prepare(); + loadKylinConfig(metaUrl); long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, cubeDesc, cubeSegment, intermediateTableDesc, AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid), MeasureIngester.create(cubeDesc.getMeasures()), cubeSegment.buildDictionaryMap()); @@ -236,29 +200,23 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa totalCount = encodedBaseRDD.count(); logger.info("encodedBaseRDD row count: " + encodedBaseRDD.count()); } - final MeasureAggregators measureAggregators = new MeasureAggregators(cubeDesc.getMeasures()); final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new BaseCuboidReducerFunction2(measureNum, vCubeDesc.getValue(), measureAggregators); BaseCuboidReducerFunction2 reducerFunction2 = baseCuboidReducerFunction; if (allNormalMeasure == false) { reducerFunction2 = new CuboidReducerFunction2(measureNum, vCubeDesc.getValue(), measureAggregators, needAggr); } - final int totalLevels = cubeDesc.getBuildLevel(); JavaPairRDD<ByteArray, Object[]>[] allRDDs = new JavaPairRDD[totalLevels + 1]; int level = 0; int partition = estimateRDDPartitionNum(level, cubeStatsReader, kylinConfig); - // aggregate to calculate base cuboid allRDDs[0] = encodedBaseRDD.reduceByKey(baseCuboidReducerFunction, partition).persist(storageLevel); Configuration confOverwrite = new Configuration(sc.hadoopConfiguration()); confOverwrite.set("dfs.replication", "2"); // cuboid intermediate files, replication=2 - saveToHDFS(allRDDs[0], vCubeDesc.getValue(), outputPath, 0, confOverwrite); - // aggregate to ND cuboids - PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> flatMapFunction = new CuboidFlatMap(vCubeSegment.getValue(), vCubeDesc.getValue(), vCuboidScheduler.getValue(), ndCuboidBuilder); - + PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> flatMapFunction = new CuboidFlatMap(metaUrl, vCubeSegment.getValue(), vCubeDesc.getValue(), vCuboidScheduler.getValue(), ndCuboidBuilder); for (level = 1; level <= totalLevels; level++) { partition = estimateRDDPartitionNum(level, cubeStatsReader, kylinConfig); logger.info("Level " + level + " partition number: " + partition); @@ -271,6 +229,8 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa } allRDDs[totalLevels].unpersist(); logger.info("Finished on calculating all level cuboids."); + + deleteHDFSMeta(metaUrl); } private static int estimateRDDPartitionNum(int level, CubeStatsReader statsReader, KylinConfig kylinConfig) { @@ -338,6 +298,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa class CuboidFlatMap implements PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> { + String metaUrl; CubeSegment cubeSegment; CubeDesc cubeDesc; CuboidScheduler cuboidScheduler; @@ -345,7 +306,8 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa RowKeySplitter rowKeySplitter; transient boolean initialized = false; - CuboidFlatMap(CubeSegment cubeSegment, CubeDesc cubeDesc, CuboidScheduler cuboidScheduler, NDCuboidBuilder ndCuboidBuilder) { + CuboidFlatMap(String metaUrl, CubeSegment cubeSegment, CubeDesc cubeDesc, CuboidScheduler cuboidScheduler, NDCuboidBuilder ndCuboidBuilder) { + this.metaUrl = metaUrl; this.cubeSegment = cubeSegment; this.cubeDesc = cubeDesc; this.cuboidScheduler = cuboidScheduler; @@ -356,7 +318,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa @Override public Iterable<Tuple2<ByteArray, Object[]>> call(Tuple2<ByteArray, Object[]> tuple2) throws Exception { if (initialized == false) { - prepare(); + loadKylinConfig(metaUrl); initialized = true; } @@ -387,7 +349,6 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa } //sanity check - private void sanityCheck(JavaPairRDD<ByteArray, Object[]> rdd, Long totalCount, int thisLevel, CubeStatsReader cubeStatsReader, final int countMeasureIndex) { int thisCuboidNum = cubeStatsReader.getCuboidsByLayer(thisLevel).size(); Long count2 = getRDDCountSum(rdd, countMeasureIndex); @@ -413,4 +374,11 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa })._2(); return count; } + + private void deleteHDFSMeta(String metaUrl) throws IOException { + int cut = metaUrl.indexOf('@'); + String path = metaUrl.substring(0, cut); + HadoopUtil.getFileSystem(path).delete(new Path(path), true); + logger.info("Delete metadata in HDFS for this job: " + path); + }; } http://git-wip-us.apache.org/repos/asf/kylin/blob/d8d0395a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java index 1e032c6..c211ec5 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java @@ -18,13 +18,22 @@ package org.apache.kylin.engine.spark; import java.io.File; +import java.io.IOException; +import java.util.LinkedHashSet; import java.util.Map; +import java.util.Properties; +import java.util.Set; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.KylinConfigExt; +import org.apache.kylin.common.persistence.ResourceTool; import org.apache.kylin.common.util.CliCommandExecutor; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.common.JobRelatedMetaUtil; import org.apache.kylin.job.common.PatternedLogger; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; @@ -99,19 +108,20 @@ public class SparkExecutable extends AbstractExecutable { } logger.info("Using " + hadoopConf + " as HADOOP_CONF_DIR"); - //hbase-site.xml - String hbaseConf = ClassLoader.getSystemClassLoader().getResource("hbase-site.xml").getFile().toString(); - logger.info("Get hbase-site.xml location from classpath: " + hbaseConf); - File hbaseConfFile = new File(hbaseConf); - if (hbaseConfFile.exists() == false) { - throw new IllegalArgumentException("Couldn't find hbase-site.xml from classpath."); - } - String jobJar = config.getKylinJobJarPath(); if (StringUtils.isEmpty(jars)) { jars = jobJar; } + String segmentID = this.getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt()); + CubeSegment segment = cube.getSegmentById(segmentID); + + try { + attachSegmentMetadataWithDict(segment); + } catch (IOException e) { + throw new ExecuteException("meta dump fialed"); + } + StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.common.util.SparkEntry "); @@ -120,9 +130,9 @@ public class SparkExecutable extends AbstractExecutable { stringBuilder.append(" --conf ").append(entry.getKey()).append("=").append(entry.getValue()).append(" "); } - stringBuilder.append("--files %s --jars %s %s %s"); + stringBuilder.append("--jars %s %s %s"); try { - String cmd = String.format(stringBuilder.toString(), hadoopConf, KylinConfig.getSparkHome(), hbaseConfFile.getAbsolutePath(), jars, jobJar, formatArgs()); + String cmd = String.format(stringBuilder.toString(), hadoopConf, KylinConfig.getSparkHome(), jars, jobJar, formatArgs()); logger.info("cmd: " + cmd); CliCommandExecutor exec = new CliCommandExecutor(); PatternedLogger patternedLogger = new PatternedLogger(logger); @@ -135,5 +145,33 @@ public class SparkExecutable extends AbstractExecutable { } } + private void attachSegmentMetadataWithDict(CubeSegment segment) throws IOException { + Set<String> dumpList = new LinkedHashSet<>(); + dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segment.getCubeInstance())); + dumpList.addAll(segment.getDictionaryPaths()); + dumpList.add(segment.getStatisticsResourcePath()); + dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segment.getConfig()); + } + + private void dumpAndUploadKylinPropsAndMetadata(Set<String> dumpList, KylinConfigExt kylinConfig) throws IOException { + File tmp = File.createTempFile("kylin_job_meta", ""); + FileUtils.forceDelete(tmp); // we need a directory, so delete the file first + + File metaDir = new File(tmp, "meta"); + metaDir.mkdirs(); + // dump metadata + JobRelatedMetaUtil.dumpResources(kylinConfig, metaDir, dumpList); + + // write kylin.properties + File kylinPropsFile = new File(metaDir, "kylin.properties"); + Properties properties = kylinConfig.getAllProperties(); + String metadataUrl = this.getParam(SparkCubingByLayer.OPTION_META_URL.getOpt()); + properties.setProperty("kylin.metadata.url", metadataUrl); + kylinConfig.writeProperties(properties, kylinPropsFile); + + KylinConfig dstConfig = KylinConfig.createKylinConfig(properties); + //upload metadata + ResourceTool.copy(KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()), dstConfig); + } }