This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5611760  KYLIN-3368 Move Spark cubing metadata dump to job folder
5611760 is described below

commit 56117606d1518da9740c0f241639866fb0f8c0fb
Author: shaofengshi <[email protected]>
AuthorDate: Fri May 11 21:29:40 2018 +0800

    KYLIN-3368 Move Spark cubing metadata dump to job folder
---
 .../java/org/apache/kylin/engine/mr/JobBuilderSupport.java   |  5 +++++
 .../kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java  |  4 +---
 .../kylin/engine/spark/SparkBatchCubingJobBuilder2.java      | 12 ++++--------
 .../org/apache/kylin/engine/spark/SparkCubingByLayer.java    |  8 ++++----
 4 files changed, 14 insertions(+), 15 deletions(-)

diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 8228f87..8a420df 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -261,4 +261,9 @@ public class JobBuilderSupport {
     public static String getInMemCuboidPath(String cuboidRootPath) {
         return cuboidRootPath + PathNameCuboidInMem;
     }
+
+    public String getDumpMetadataPath(String jobId) {
+        return getRealizationRootPath(jobId) + "/metadata";
+    }
+
 }
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
index 018abab..8b478aa 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
@@ -21,7 +21,6 @@ package org.apache.kylin.engine.mr.steps;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -39,14 +38,13 @@ public class UpdateCubeInfoAfterMergeStep extends 
AbstractExecutable {
 
     private static final Logger logger = 
LoggerFactory.getLogger(UpdateCubeInfoAfterMergeStep.class);
 
-    private final CubeManager cubeManager = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-
     public UpdateCubeInfoAfterMergeStep() {
         super();
     }
 
     @Override
     protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
+        final CubeManager cubeManager = 
CubeManager.getInstance(context.getConfig());
         final CubeInstance cube = 
cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
 
         CubeSegment mergedSegment = 
cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 7d76ce4..57d4fb0 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -67,7 +67,7 @@ public class SparkBatchCubingJobBuilder2 extends 
BatchCubingJobBuilder2 {
         return "";
     }
 
-    public static void configureSparkJob(final CubeSegment seg, final 
SparkExecutable sparkExecutable,
+    public void configureSparkJob(final CubeSegment seg, final SparkExecutable 
sparkExecutable,
             final String jobId, final String cuboidRootPath) {
         IJoinedFlatTableDesc flatTableDesc = 
EngineFactory.getJoinedFlatTableDesc(seg);
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(), 
seg.getRealization().getName());
@@ -75,15 +75,12 @@ public class SparkBatchCubingJobBuilder2 extends 
BatchCubingJobBuilder2 {
         
sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(),
                 seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + 
flatTableDesc.getTableName());
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_META_URL.getOpt(),
-                getSegmentMetadataUrl(seg.getConfig(), seg.getUuid()));
+                getSegmentMetadataUrl(seg.getConfig(), jobId));
         
sparkExecutable.setParam(SparkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(), 
cuboidRootPath);
         sparkExecutable.setJobId(jobId);
 
         StringBuilder jars = new StringBuilder();
 
-        StringUtil.appendWithSeparator(jars, 
findJar("org.htrace.HTraceConfiguration", null)); // htrace-core.jar
-        StringUtil.appendWithSeparator(jars, 
findJar("org.apache.htrace.Trace", null)); // htrace-core.jar
-        StringUtil.appendWithSeparator(jars, 
findJar("org.cloudera.htrace.HTraceConfiguration", null)); // htrace-core.jar
         StringUtil.appendWithSeparator(jars, 
findJar("com.yammer.metrics.core.Gauge", null)); // metrics-core.jar
         StringUtil.appendWithSeparator(jars, 
findJar("com.google.common.collect.Maps", "guava")); //guava.jar
 
@@ -92,10 +89,9 @@ public class SparkBatchCubingJobBuilder2 extends 
BatchCubingJobBuilder2 {
         
sparkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_SPARK_CUBE);
     }
 
-    private static String getSegmentMetadataUrl(KylinConfig kylinConfig, 
String segmentID) {
+    private String getSegmentMetadataUrl(KylinConfig kylinConfig, String 
jobId) {
         Map<String, String> param = new HashMap<>();
-        param.put("path", kylinConfig.getHdfsWorkingDirectory() + "metadata/" 
+ segmentID);
+        param.put("path", getDumpMetadataPath(jobId));
         return new StorageURL(kylinConfig.getMetadataUrl().getIdentifier(), 
"hdfs", param).toString();
-//        return kylinConfig.getHdfsWorkingDirectory() + "metadata/" + 
segmentID + "@hdfs";
     }
 }
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 714991d..76e7e22 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
 import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.HadoopUtil;
@@ -488,9 +489,8 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
     }
 
     protected void deleteHDFSMeta(String metaUrl) throws IOException {
-        int cut = metaUrl.indexOf('@');
-        String path = metaUrl.substring(0, cut);
-        HadoopUtil.getFileSystem(path).delete(new Path(path), true);
-        logger.info("Delete metadata in HDFS for this job: " + path);
+        String realHdfsPath = StorageURL.valueOf(metaUrl).getParameter("path");
+        HadoopUtil.getFileSystem(realHdfsPath).delete(new Path(realHdfsPath), 
true);
+        logger.info("Delete metadata in HDFS for this job: " + realHdfsPath);
     }
 }

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to