Repository: kylin
Updated Branches:
  refs/heads/KYLIN-2653 edbe7a52f -> 276fa2c4d (forced update)


KYLIN-2653 Manage the metadata use HDFSResourceStore for Spark Cubing


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d8d0395a
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d8d0395a
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d8d0395a

Branch: refs/heads/KYLIN-2653
Commit: d8d0395a80cc50fcb59bab4d402c7675aef6cd22
Parents: 1a3527c
Author: kangkaisen <kangkai...@live.com>
Authored: Thu May 25 14:43:02 2017 +0800
Committer: kangkaisen <kangkai...@meituan.com>
Committed: Sat Jul 22 14:59:47 2017 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/KylinConfig.java    | 35 ++++++++-
 .../org/apache/kylin/common/KylinConfigExt.java |  2 +-
 .../engine/mr/common/AbstractHadoopJob.java     | 45 +----------
 .../engine/mr/common/JobRelatedMetaUtil.java    | 71 +++++++++++++++++
 .../spark/SparkBatchCubingJobBuilder2.java      |  6 +-
 .../kylin/engine/spark/SparkCubingByLayer.java  | 82 ++++++--------------
 .../kylin/engine/spark/SparkExecutable.java     | 58 +++++++++++---
 7 files changed, 188 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/d8d0395a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index cc08056..a56e9b8 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -36,8 +36,11 @@ import java.util.Properties;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.restclient.RestClient;
 import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.OrderedProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -114,10 +117,13 @@ public class KylinConfig extends KylinConfigBase {
     }
 
     public enum UriType {
-        PROPERTIES_FILE, REST_ADDR, LOCAL_FOLDER
+        PROPERTIES_FILE, REST_ADDR, LOCAL_FOLDER, HDFS_FILE
     }
 
     private static UriType decideUriType(String metaUri) {
+        if (metaUri.indexOf("@hdfs") > 0) {
+            return UriType.HDFS_FILE;
+        }
 
         try {
             File file = new File(metaUri);
@@ -157,6 +163,23 @@ public class KylinConfig extends KylinConfigBase {
          */
         UriType uriType = decideUriType(uri);
 
+        if (uriType == UriType.HDFS_FILE) {
+            KylinConfig config;
+            FileSystem fs;
+            int cut = uri.indexOf('@');
+            String realHdfsPath = uri.substring(0, cut) + "/" + 
KYLIN_CONF_PROPERTIES_FILE;
+            try {
+                config = new KylinConfig();
+                fs = HadoopUtil.getFileSystem(realHdfsPath);
+                InputStream is = fs.open(new Path(realHdfsPath));
+                Properties prop = streamToProps(is);
+                config.reloadKylinConfig(prop);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+            return config;
+        }
+
         if (uriType == UriType.LOCAL_FOLDER) {
             KylinConfig config = new KylinConfig();
             config.setMetadataUrl(uri);
@@ -402,6 +425,16 @@ public class KylinConfig extends KylinConfigBase {
         super(props, force);
     }
 
+    public void writeProperties(Properties props, File file) throws 
IOException {
+        FileOutputStream fos = null;
+        try {
+            fos = new FileOutputStream(file);
+            props.store(fos, file.getAbsolutePath());
+        } finally {
+            IOUtils.closeQuietly(fos);
+        }
+    }
+
     public void writeProperties(File file) throws IOException {
         FileOutputStream fos = null;
         try {

http://git-wip-us.apache.org/repos/asf/kylin/blob/d8d0395a/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java
index d49dee7..786f467 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java
@@ -61,7 +61,7 @@ public class KylinConfigExt extends KylinConfig {
             return super.getOptional(prop, dft);
     }
 
-    protected Properties getAllProperties() {
+    public Properties getAllProperties() {
         Properties result = new Properties();
         result.putAll(super.getAllProperties());
         result.putAll(overrides);

http://git-wip-us.apache.org/repos/asf/kylin/blob/d8d0395a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index f9d9808..fc8fb4e 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -56,8 +56,6 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.RawResource;
-import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.CliCommandExecutor;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.OptionsHelper;
@@ -68,8 +66,6 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TableRef;
-import org.apache.kylin.source.SourceFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -447,12 +443,12 @@ public abstract class AbstractHadoopJob extends 
Configured implements Tool {
     }
 
     protected void attachCubeMetadata(CubeInstance cube, Configuration conf) 
throws IOException {
-        dumpKylinPropsAndMetadata(collectCubeMetadata(cube), cube.getConfig(), 
conf);
+        
dumpKylinPropsAndMetadata(JobRelatedMetaUtil.collectCubeMetadata(cube), 
cube.getConfig(), conf);
     }
 
     protected void attachCubeMetadataWithDict(CubeInstance cube, Configuration 
conf) throws IOException {
         Set<String> dumpList = new LinkedHashSet<>();
-        dumpList.addAll(collectCubeMetadata(cube));
+        dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(cube));
         for (CubeSegment segment : cube.getSegments()) {
             dumpList.addAll(segment.getDictionaryPaths());
         }
@@ -461,27 +457,11 @@ public abstract class AbstractHadoopJob extends 
Configured implements Tool {
 
     protected void attachSegmentMetadataWithDict(CubeSegment segment, 
Configuration conf) throws IOException {
         Set<String> dumpList = new LinkedHashSet<>();
-        dumpList.addAll(collectCubeMetadata(segment.getCubeInstance()));
+        
dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segment.getCubeInstance()));
         dumpList.addAll(segment.getDictionaryPaths());
         dumpKylinPropsAndMetadata(dumpList, segment.getConfig(), conf);
     }
 
-    private Set<String> collectCubeMetadata(CubeInstance cube) {
-        // cube, model_desc, cube_desc, table
-        Set<String> dumpList = new LinkedHashSet<>();
-        dumpList.add(cube.getResourcePath());
-        dumpList.add(cube.getDescriptor().getModel().getResourcePath());
-        dumpList.add(cube.getDescriptor().getResourcePath());
-
-        for (TableRef tableRef : 
cube.getDescriptor().getModel().getAllTables()) {
-            TableDesc table = tableRef.getTableDesc();
-            dumpList.add(table.getResourcePath());
-            dumpList.addAll(SourceFactory.getMRDependentResources(table));
-        }
-
-        return dumpList;
-    }
-
     protected void dumpKylinPropsAndMetadata(Set<String> dumpList, KylinConfig 
kylinConfig, Configuration conf) throws IOException {
         File tmp = File.createTempFile("kylin_job_meta", "");
         FileUtils.forceDelete(tmp); // we need a directory, so delete the file 
first
@@ -494,7 +474,7 @@ public abstract class AbstractHadoopJob extends Configured 
implements Tool {
         kylinConfig.writeProperties(kylinPropsFile);
 
         // write resources
-        dumpResources(kylinConfig, metaDir, dumpList);
+        JobRelatedMetaUtil.dumpResources(kylinConfig, metaDir, dumpList);
 
         // hadoop distributed cache
         String hdfsMetaDir = 
OptionsHelper.convertToFileURL(metaDir.getAbsolutePath());
@@ -530,23 +510,6 @@ public abstract class AbstractHadoopJob extends Configured 
implements Tool {
         }
     }
 
-    private void dumpResources(KylinConfig kylinConfig, File metaDir, 
Set<String> dumpList) throws IOException {
-        long startTime = System.currentTimeMillis();
-
-        ResourceStore from = ResourceStore.getStore(kylinConfig);
-        KylinConfig localConfig = 
KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath());
-        ResourceStore to = ResourceStore.getStore(localConfig);
-        for (String path : dumpList) {
-            RawResource res = from.getResource(path);
-            if (res == null)
-                throw new IllegalStateException("No resource found at -- " + 
path);
-            to.putResource(path, res.inputStream, res.timestamp);
-            res.inputStream.close();
-        }
-
-        logger.debug("Dump resources to {} took {} ms", metaDir, 
System.currentTimeMillis() - startTime);
-    }
-
     protected void deletePath(Configuration conf, Path path) throws 
IOException {
         HadoopUtil.deletePath(conf, path);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d8d0395a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java
new file mode 100644
index 0000000..46b1d3c
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.RawResource;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.source.SourceFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+public class JobRelatedMetaUtil {
+    private static final Logger logger = 
LoggerFactory.getLogger(JobRelatedMetaUtil.class);
+
+    public static Set<String> collectCubeMetadata(CubeInstance cube) {
+        // cube, model_desc, cube_desc, table
+        Set<String> dumpList = new LinkedHashSet<>();
+        dumpList.add(cube.getResourcePath());
+        dumpList.add(cube.getDescriptor().getModel().getResourcePath());
+        dumpList.add(cube.getDescriptor().getResourcePath());
+
+        for (TableRef tableRef : 
cube.getDescriptor().getModel().getAllTables()) {
+            TableDesc table = tableRef.getTableDesc();
+            dumpList.add(table.getResourcePath());
+            dumpList.addAll(SourceFactory.getMRDependentResources(table));
+        }
+
+        return dumpList;
+    }
+
+    public static void dumpResources(KylinConfig kylinConfig, File metaDir, 
Set<String> dumpList) throws IOException {
+        long startTime = System.currentTimeMillis();
+
+        ResourceStore from = ResourceStore.getStore(kylinConfig);
+        KylinConfig localConfig = 
KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath());
+        ResourceStore to = ResourceStore.getStore(localConfig);
+        for (String path : dumpList) {
+            RawResource res = from.getResource(path);
+            if (res == null)
+                throw new IllegalStateException("No resource found at -- " + 
path);
+            to.putResource(path, res.inputStream, res.timestamp);
+            res.inputStream.close();
+        }
+
+        logger.debug("Dump resources to {} took {} ms", metaDir, 
System.currentTimeMillis() - startTime);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/d8d0395a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 66b154d..07bc334 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -48,7 +48,7 @@ public class SparkBatchCubingJobBuilder2 extends 
BatchCubingJobBuilder2 {
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(), 
seg.getRealization().getName());
         
sparkExecutable.setParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt(), 
seg.getUuid());
         
sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(), 
seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + 
flatTableDesc.getTableName());
-        sparkExecutable.setParam(SparkCubingByLayer.OPTION_CONF_PATH.getOpt(), 
KylinConfig.getKylinConfPath());
+        sparkExecutable.setParam(SparkCubingByLayer.OPTION_META_URL.getOpt(), 
getSegmentMetadataUrl(seg.getConfig(), seg.getUuid()));
         
sparkExecutable.setParam(SparkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(), 
cuboidRootPath);
 
         StringBuilder jars = new StringBuilder();
@@ -84,4 +84,8 @@ public class SparkBatchCubingJobBuilder2 extends 
BatchCubingJobBuilder2 {
         return "";
     }
 
+    private String getSegmentMetadataUrl(KylinConfig kylinConfig, String 
segmentID) {
+        return kylinConfig.getHdfsWorkingDirectory() + "metadata/" + segmentID 
+ "@hdfs";
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d8d0395a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 91aa9f7..a8e7378 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -26,7 +26,6 @@ import 
org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.common.util.Pair;
@@ -51,7 +50,6 @@ import org.apache.kylin.measure.MeasureAggregators;
 import org.apache.kylin.measure.MeasureIngester;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.spark.SparkConf;
-import org.apache.spark.SparkFiles;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
@@ -67,15 +65,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Tuple2;
 
-import java.io.File;
-import java.io.FileFilter;
+import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
-
 /**
  * Spark application to build cube with the "by-layer" algorithm. Only support 
source data from Hive; Metadata in HBase.
  */
@@ -85,7 +81,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
 
     public static final Option OPTION_CUBE_NAME = 
OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube
 Name").create(BatchConstants.ARG_CUBE_NAME);
     public static final Option OPTION_SEGMENT_ID = 
OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube
 Segment Id").create("segmentId");
-    public static final Option OPTION_CONF_PATH = 
OptionBuilder.withArgName("confPath").hasArg().isRequired(true).withDescription("Configuration
 Path").create("confPath");
+    public static final Option OPTION_META_URL = 
OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true).withDescription("HDFS
 metadata url").create("metaUrl");
     public static final Option OPTION_OUTPUT_PATH = 
OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg().isRequired(true).withDescription("Cube
 output path").create(BatchConstants.ARG_OUTPUT);
     public static final Option OPTION_INPUT_TABLE = 
OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true).withDescription("Hive
 Intermediate Table").create("hiveTable");
 
@@ -96,7 +92,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
         options.addOption(OPTION_INPUT_TABLE);
         options.addOption(OPTION_CUBE_NAME);
         options.addOption(OPTION_SEGMENT_ID);
-        options.addOption(OPTION_CONF_PATH);
+        options.addOption(OPTION_META_URL);
         options.addOption(OPTION_OUTPUT_PATH);
     }
 
@@ -105,32 +101,10 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
         return options;
     }
 
-    private void setupClasspath(JavaSparkContext sc, String confPath) throws 
Exception {
-        ClassUtil.addClasspath(confPath);
-        final File[] files = new File(confPath).listFiles(new FileFilter() {
-            @Override
-            public boolean accept(File pathname) {
-                if (pathname.getAbsolutePath().endsWith(".xml")) {
-                    return true;
-                }
-                if (pathname.getAbsolutePath().endsWith(".properties")) {
-                    return true;
-                }
-                return false;
-            }
-        });
-        for (File file : files) {
-            sc.addFile(file.getAbsolutePath());
-        }
-    }
-
-    private static final void prepare() {
-        File file = new File(SparkFiles.get("kylin.properties"));
-        String confPath = file.getParentFile().getAbsolutePath();
-        logger.info("conf directory:" + confPath);
-        System.setProperty(KylinConfig.KYLIN_CONF, confPath);
-        ClassUtil.addClasspath(confPath);
-
+    public static KylinConfig loadKylinConfig(String metaUrl) throws 
IOException {
+        KylinConfig kylinConfig = KylinConfig.createInstanceFromUri(metaUrl);
+        KylinConfig.setKylinConfigThreadLocal(kylinConfig);
+        return kylinConfig;
     }
 
     @Override
@@ -138,7 +112,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
         final String hiveTable = 
optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
         final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
         final String segmentId = 
optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
-        final String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH);
+        final String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
         final String outputPath = 
optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
 
         SparkConf conf = new SparkConf().setAppName("Cubing for:" + cubeName + 
" segment " + segmentId);
@@ -146,30 +120,22 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
         conf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
         conf.set("spark.kryo.registrator", 
"org.apache.kylin.engine.spark.KylinKryoRegistrator");
         conf.set("spark.kryo.registrationRequired", "true");
-
         JavaSparkContext sc = new JavaSparkContext(conf);
-        setupClasspath(sc, confPath);
         HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
-
-        System.setProperty(KylinConfig.KYLIN_CONF, confPath);
-        final KylinConfig envConfig = KylinConfig.getInstanceFromEnv();
+        final KylinConfig kylinConfig = loadKylinConfig(metaUrl);
 
         HiveContext sqlContext = new HiveContext(sc.sc());
         final DataFrame intermediateTable = sqlContext.table(hiveTable);
-
-        final CubeInstance cubeInstance = 
CubeManager.getInstance(envConfig).getCube(cubeName);
+        final CubeInstance cubeInstance = 
CubeManager.getInstance(kylinConfig).getCube(cubeName);
         final CubeDesc cubeDesc = cubeInstance.getDescriptor();
         final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
         final CubeJoinedFlatTableEnrich intermediateTableDesc = new 
CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), 
cubeDesc);
 
-        final KylinConfig kylinConfig = cubeDesc.getConfig();
         final Broadcast<CubeDesc> vCubeDesc = sc.broadcast(cubeDesc);
         final Broadcast<CubeSegment> vCubeSegment = sc.broadcast(cubeSegment);
         final NDCuboidBuilder ndCuboidBuilder = new 
NDCuboidBuilder(vCubeSegment.getValue(), new 
RowKeyEncoderProvider(vCubeSegment.getValue()));
-
         final Broadcast<CuboidScheduler> vCuboidScheduler = sc.broadcast(new 
CuboidScheduler(vCubeDesc.getValue()));
         final int measureNum = cubeDesc.getMeasures().size();
-
         int countMeasureIndex = 0;
         for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
             if (measureDesc.getFunction().isCount() == true) {
@@ -186,9 +152,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
             allNormalMeasure = allNormalMeasure && needAggr[i];
         }
         logger.info("All measure are normal (agg on all cuboids) ? : " + 
allNormalMeasure);
-
         StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK_SER();
-
         // encode with dimension encoding, transform to <ByteArray, Object[]> 
RDD
         final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = 
intermediateTable.javaRDD().mapToPair(new PairFunction<Row, ByteArray, 
Object[]>() {
             volatile transient boolean initialized = false;
@@ -199,7 +163,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
                 if (initialized == false) {
                     synchronized (SparkCubingByLayer.class) {
                         if (initialized == false) {
-                            prepare();
+                            loadKylinConfig(metaUrl);
                             long baseCuboidId = 
Cuboid.getBaseCuboidId(cubeDesc);
                             Cuboid baseCuboid = Cuboid.findById(cubeDesc, 
baseCuboidId);
                             baseCuboidBuilder = new 
BaseCuboidBuilder(kylinConfig, cubeDesc, cubeSegment, intermediateTableDesc, 
AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid), 
MeasureIngester.create(cubeDesc.getMeasures()), 
cubeSegment.buildDictionaryMap());
@@ -236,29 +200,23 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
             totalCount = encodedBaseRDD.count();
             logger.info("encodedBaseRDD row count: " + encodedBaseRDD.count());
         }
-
         final MeasureAggregators measureAggregators = new 
MeasureAggregators(cubeDesc.getMeasures());
         final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new 
BaseCuboidReducerFunction2(measureNum, vCubeDesc.getValue(), 
measureAggregators);
         BaseCuboidReducerFunction2 reducerFunction2 = 
baseCuboidReducerFunction;
         if (allNormalMeasure == false) {
             reducerFunction2 = new CuboidReducerFunction2(measureNum, 
vCubeDesc.getValue(), measureAggregators, needAggr);
         }
-
         final int totalLevels = cubeDesc.getBuildLevel();
         JavaPairRDD<ByteArray, Object[]>[] allRDDs = new 
JavaPairRDD[totalLevels + 1];
         int level = 0;
         int partition = estimateRDDPartitionNum(level, cubeStatsReader, 
kylinConfig);
-
         // aggregate to calculate base cuboid
         allRDDs[0] = encodedBaseRDD.reduceByKey(baseCuboidReducerFunction, 
partition).persist(storageLevel);
         Configuration confOverwrite = new 
Configuration(sc.hadoopConfiguration());
         confOverwrite.set("dfs.replication", "2"); // cuboid intermediate 
files, replication=2
-
         saveToHDFS(allRDDs[0], vCubeDesc.getValue(), outputPath, 0, 
confOverwrite);
-
         // aggregate to ND cuboids
-        PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> 
flatMapFunction = new CuboidFlatMap(vCubeSegment.getValue(), 
vCubeDesc.getValue(), vCuboidScheduler.getValue(), ndCuboidBuilder);
-
+        PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> 
flatMapFunction = new CuboidFlatMap(metaUrl, vCubeSegment.getValue(), 
vCubeDesc.getValue(), vCuboidScheduler.getValue(), ndCuboidBuilder);
         for (level = 1; level <= totalLevels; level++) {
             partition = estimateRDDPartitionNum(level, cubeStatsReader, 
kylinConfig);
             logger.info("Level " + level + " partition number: " + partition);
@@ -271,6 +229,8 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
         }
         allRDDs[totalLevels].unpersist();
         logger.info("Finished on calculating all level cuboids.");
+
+        deleteHDFSMeta(metaUrl);
     }
 
     private static int estimateRDDPartitionNum(int level, CubeStatsReader 
statsReader, KylinConfig kylinConfig) {
@@ -338,6 +298,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
 
     class CuboidFlatMap implements PairFlatMapFunction<Tuple2<ByteArray, 
Object[]>, ByteArray, Object[]> {
 
+        String metaUrl;
         CubeSegment cubeSegment;
         CubeDesc cubeDesc;
         CuboidScheduler cuboidScheduler;
@@ -345,7 +306,8 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
         RowKeySplitter rowKeySplitter;
         transient boolean initialized = false;
 
-        CuboidFlatMap(CubeSegment cubeSegment, CubeDesc cubeDesc, 
CuboidScheduler cuboidScheduler, NDCuboidBuilder ndCuboidBuilder) {
+        CuboidFlatMap(String metaUrl, CubeSegment cubeSegment, CubeDesc 
cubeDesc, CuboidScheduler cuboidScheduler, NDCuboidBuilder ndCuboidBuilder) {
+            this.metaUrl = metaUrl;
             this.cubeSegment = cubeSegment;
             this.cubeDesc = cubeDesc;
             this.cuboidScheduler = cuboidScheduler;
@@ -356,7 +318,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
         @Override
         public Iterable<Tuple2<ByteArray, Object[]>> call(Tuple2<ByteArray, 
Object[]> tuple2) throws Exception {
             if (initialized == false) {
-                prepare();
+                loadKylinConfig(metaUrl);
                 initialized = true;
             }
 
@@ -387,7 +349,6 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
     }
 
     //sanity check
-
     private void sanityCheck(JavaPairRDD<ByteArray, Object[]> rdd, Long 
totalCount, int thisLevel, CubeStatsReader cubeStatsReader, final int 
countMeasureIndex) {
         int thisCuboidNum = 
cubeStatsReader.getCuboidsByLayer(thisLevel).size();
         Long count2 = getRDDCountSum(rdd, countMeasureIndex);
@@ -413,4 +374,11 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
         })._2();
         return count;
     }
+
+    private void deleteHDFSMeta(String metaUrl) throws IOException {
+        int cut = metaUrl.indexOf('@');
+        String path = metaUrl.substring(0, cut);
+        HadoopUtil.getFileSystem(path).delete(new Path(path), true);
+        logger.info("Delete metadata in HDFS for this job: " + path);
+    };
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d8d0395a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 1e032c6..c211ec5 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -18,13 +18,22 @@
 package org.apache.kylin.engine.spark;
 
 import java.io.File;
+import java.io.IOException;
+import java.util.LinkedHashSet;
 import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinConfigExt;
+import org.apache.kylin.common.persistence.ResourceTool;
 import org.apache.kylin.common.util.CliCommandExecutor;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.JobRelatedMetaUtil;
 import org.apache.kylin.job.common.PatternedLogger;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
@@ -99,19 +108,20 @@ public class SparkExecutable extends AbstractExecutable {
         }
         logger.info("Using " + hadoopConf + " as HADOOP_CONF_DIR");
 
-        //hbase-site.xml
-        String hbaseConf = 
ClassLoader.getSystemClassLoader().getResource("hbase-site.xml").getFile().toString();
-        logger.info("Get hbase-site.xml location from classpath: " + 
hbaseConf);
-        File hbaseConfFile = new File(hbaseConf);
-        if (hbaseConfFile.exists() == false) {
-            throw new IllegalArgumentException("Couldn't find hbase-site.xml 
from classpath.");
-        }
-
         String jobJar = config.getKylinJobJarPath();
         if (StringUtils.isEmpty(jars)) {
             jars = jobJar;
         }
 
+        String segmentID = 
this.getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt());
+        CubeSegment segment = cube.getSegmentById(segmentID);
+
+        try {
+            attachSegmentMetadataWithDict(segment);
+        } catch (IOException e) {
+            throw new ExecuteException("meta dump fialed");
+        }
+
         StringBuilder stringBuilder = new StringBuilder();
         stringBuilder.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit 
--class org.apache.kylin.common.util.SparkEntry ");
 
@@ -120,9 +130,9 @@ public class SparkExecutable extends AbstractExecutable {
             stringBuilder.append(" --conf 
").append(entry.getKey()).append("=").append(entry.getValue()).append(" ");
         }
 
-        stringBuilder.append("--files %s --jars %s %s %s");
+        stringBuilder.append("--jars %s %s %s");
         try {
-            String cmd = String.format(stringBuilder.toString(), hadoopConf, 
KylinConfig.getSparkHome(), hbaseConfFile.getAbsolutePath(), jars, jobJar, 
formatArgs());
+            String cmd = String.format(stringBuilder.toString(), hadoopConf, 
KylinConfig.getSparkHome(), jars, jobJar, formatArgs());
             logger.info("cmd: " + cmd);
             CliCommandExecutor exec = new CliCommandExecutor();
             PatternedLogger patternedLogger = new PatternedLogger(logger);
@@ -135,5 +145,33 @@ public class SparkExecutable extends AbstractExecutable {
         }
     }
 
+    private void attachSegmentMetadataWithDict(CubeSegment segment) throws 
IOException {
+        Set<String> dumpList = new LinkedHashSet<>();
+        
dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segment.getCubeInstance()));
+        dumpList.addAll(segment.getDictionaryPaths());
+        dumpList.add(segment.getStatisticsResourcePath());
+        dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) 
segment.getConfig());
+    }
+
+    private void dumpAndUploadKylinPropsAndMetadata(Set<String> dumpList, 
KylinConfigExt kylinConfig) throws IOException {
+        File tmp = File.createTempFile("kylin_job_meta", "");
+        FileUtils.forceDelete(tmp); // we need a directory, so delete the file 
first
+
+        File metaDir = new File(tmp, "meta");
+        metaDir.mkdirs();
 
+        // dump metadata
+        JobRelatedMetaUtil.dumpResources(kylinConfig, metaDir, dumpList);
+
+        // write kylin.properties
+        File kylinPropsFile = new File(metaDir, "kylin.properties");
+        Properties properties = kylinConfig.getAllProperties();
+        String metadataUrl = 
this.getParam(SparkCubingByLayer.OPTION_META_URL.getOpt());
+        properties.setProperty("kylin.metadata.url", metadataUrl);
+        kylinConfig.writeProperties(properties, kylinPropsFile);
+
+        KylinConfig dstConfig = KylinConfig.createKylinConfig(properties);
+        //upload metadata
+        
ResourceTool.copy(KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()), 
dstConfig);
+    }
 }

Reply via email to