This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 5611760 KYLIN-3368 Move Spark cubing metadata dump to job folder
5611760 is described below
commit 56117606d1518da9740c0f241639866fb0f8c0fb
Author: shaofengshi <[email protected]>
AuthorDate: Fri May 11 21:29:40 2018 +0800
KYLIN-3368 Move Spark cubing metadata dump to job folder
---
.../java/org/apache/kylin/engine/mr/JobBuilderSupport.java | 5 +++++
.../kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java | 4 +---
.../kylin/engine/spark/SparkBatchCubingJobBuilder2.java | 12 ++++--------
.../org/apache/kylin/engine/spark/SparkCubingByLayer.java | 8 ++++----
4 files changed, 14 insertions(+), 15 deletions(-)
diff --git
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 8228f87..8a420df 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -261,4 +261,9 @@ public class JobBuilderSupport {
public static String getInMemCuboidPath(String cuboidRootPath) {
return cuboidRootPath + PathNameCuboidInMem;
}
+
+ public String getDumpMetadataPath(String jobId) {
+ return getRealizationRootPath(jobId) + "/metadata";
+ }
+
}
diff --git
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
index 018abab..8b478aa 100644
---
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
+++
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
@@ -21,7 +21,6 @@ package org.apache.kylin.engine.mr.steps;
import java.io.IOException;
import java.util.List;
-import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
@@ -39,14 +38,13 @@ public class UpdateCubeInfoAfterMergeStep extends
AbstractExecutable {
private static final Logger logger =
LoggerFactory.getLogger(UpdateCubeInfoAfterMergeStep.class);
- private final CubeManager cubeManager =
CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-
public UpdateCubeInfoAfterMergeStep() {
super();
}
@Override
protected ExecuteResult doWork(ExecutableContext context) throws
ExecuteException {
+ final CubeManager cubeManager =
CubeManager.getInstance(context.getConfig());
final CubeInstance cube =
cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
CubeSegment mergedSegment =
cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
diff --git
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 7d76ce4..57d4fb0 100644
---
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -67,7 +67,7 @@ public class SparkBatchCubingJobBuilder2 extends
BatchCubingJobBuilder2 {
return "";
}
- public static void configureSparkJob(final CubeSegment seg, final
SparkExecutable sparkExecutable,
+ public void configureSparkJob(final CubeSegment seg, final SparkExecutable
sparkExecutable,
final String jobId, final String cuboidRootPath) {
IJoinedFlatTableDesc flatTableDesc =
EngineFactory.getJoinedFlatTableDesc(seg);
sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(),
seg.getRealization().getName());
@@ -75,15 +75,12 @@ public class SparkBatchCubingJobBuilder2 extends
BatchCubingJobBuilder2 {
sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(),
seg.getConfig().getHiveDatabaseForIntermediateTable() + "." +
flatTableDesc.getTableName());
sparkExecutable.setParam(SparkCubingByLayer.OPTION_META_URL.getOpt(),
- getSegmentMetadataUrl(seg.getConfig(), seg.getUuid()));
+ getSegmentMetadataUrl(seg.getConfig(), jobId));
sparkExecutable.setParam(SparkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(),
cuboidRootPath);
sparkExecutable.setJobId(jobId);
StringBuilder jars = new StringBuilder();
- StringUtil.appendWithSeparator(jars,
findJar("org.htrace.HTraceConfiguration", null)); // htrace-core.jar
- StringUtil.appendWithSeparator(jars,
findJar("org.apache.htrace.Trace", null)); // htrace-core.jar
- StringUtil.appendWithSeparator(jars,
findJar("org.cloudera.htrace.HTraceConfiguration", null)); // htrace-core.jar
StringUtil.appendWithSeparator(jars,
findJar("com.yammer.metrics.core.Gauge", null)); // metrics-core.jar
StringUtil.appendWithSeparator(jars,
findJar("com.google.common.collect.Maps", "guava")); //guava.jar
@@ -92,10 +89,9 @@ public class SparkBatchCubingJobBuilder2 extends
BatchCubingJobBuilder2 {
sparkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_SPARK_CUBE);
}
- private static String getSegmentMetadataUrl(KylinConfig kylinConfig,
String segmentID) {
+ private String getSegmentMetadataUrl(KylinConfig kylinConfig, String
jobId) {
Map<String, String> param = new HashMap<>();
- param.put("path", kylinConfig.getHdfsWorkingDirectory() + "metadata/"
+ segmentID);
+ param.put("path", getDumpMetadataPath(jobId));
return new StorageURL(kylinConfig.getMetadataUrl().getIdentifier(),
"hdfs", param).toString();
-// return kylinConfig.getHdfsWorkingDirectory() + "metadata/" +
segmentID + "@hdfs";
}
}
diff --git
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 714991d..76e7e22 100644
---
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.HadoopUtil;
@@ -488,9 +489,8 @@ public class SparkCubingByLayer extends AbstractApplication
implements Serializa
}
protected void deleteHDFSMeta(String metaUrl) throws IOException {
- int cut = metaUrl.indexOf('@');
- String path = metaUrl.substring(0, cut);
- HadoopUtil.getFileSystem(path).delete(new Path(path), true);
- logger.info("Delete metadata in HDFS for this job: " + path);
+ String realHdfsPath = StorageURL.valueOf(metaUrl).getParameter("path");
+ HadoopUtil.getFileSystem(realHdfsPath).delete(new Path(realHdfsPath),
true);
+ logger.info("Delete metadata in HDFS for this job: " + realHdfsPath);
}
}
--
To stop receiving notification emails like this one, please contact
[email protected].