KYLIN-2811, fix NPE Signed-off-by: Hongbin Ma <mahong...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/727920b4 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/727920b4 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/727920b4 Branch: refs/heads/2622-2764 Commit: 727920b4d3642aaa3657d90b7f3dce7dcdd68fe2 Parents: a1c234a Author: Cheng Wang <cheng.w...@kyligence.io> Authored: Mon Aug 28 12:47:58 2017 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Mon Aug 28 14:41:26 2017 +0800 ---------------------------------------------------------------------- .../kylin/engine/spark/SparkCubingByLayer.java | 43 +++++++++++--------- 1 file changed, 24 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/727920b4/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java index a3a6ad0..a03e238 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java @@ -100,7 +100,6 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa .withDescription("Configuration Path").create("confPath"); private Options options; - private static String metaUrl; public SparkCubingByLayer() { options = new Options(); @@ -117,7 +116,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa return options; } - public static KylinConfig getKylinConfigForExecutor() { + public static KylinConfig getKylinConfigForExecutor(String metaUrl) { File file = new File(SparkFiles.get(KylinConfig.KYLIN_CONF_PROPERTIES_FILE)); String confPath = file.getParentFile().getAbsolutePath(); System.setProperty(KylinConfig.KYLIN_CONF, confPath); @@ -128,7 +127,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa @Override protected void execute(OptionsHelper optionsHelper) throws Exception { - metaUrl = optionsHelper.getOptionValue(OPTION_META_URL); + String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL); String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE); String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME); String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID); @@ -178,17 +177,17 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa // encode with dimension encoding, transform to <ByteArray, Object[]> RDD final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = intermediateTable.javaRDD() - .mapToPair(new EncodeBaseCuboid(cubeName, segmentId)); + .mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl)); Long totalCount = 0L; if (envConfig.isSparkSanityCheckEnabled()) { totalCount = encodedBaseRDD.count(); } - final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new BaseCuboidReducerFunction2(cubeName); + final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new BaseCuboidReducerFunction2(cubeName, metaUrl); BaseCuboidReducerFunction2 reducerFunction2 = baseCuboidReducerFunction; if (allNormalMeasure == false) { - reducerFunction2 = new CuboidReducerFunction2(cubeName, needAggr); + reducerFunction2 = new CuboidReducerFunction2(cubeName, metaUrl, needAggr); } final int totalLevels = cubeDesc.getBuildLevel(); @@ -202,18 +201,18 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa Configuration confOverwrite = new Configuration(sc.hadoopConfiguration()); confOverwrite.set("dfs.replication", "2"); // cuboid intermediate files, replication=2 - saveToHDFS(allRDDs[0], cubeName, cubeSegment, outputPath, 0, confOverwrite); + saveToHDFS(allRDDs[0], cubeName, metaUrl, cubeSegment, outputPath, 0, confOverwrite); // aggregate to ND cuboids for (level = 1; level <= totalLevels; level++) { long levelRddSize = SizeEstimator.estimate(allRDDs[level - 1]) / (1024 * 1024); partition = estimateRDDPartitionNum(level, cubeStatsReader, envConfig, (int) levelRddSize); - allRDDs[level] = allRDDs[level - 1].flatMapToPair(new CuboidFlatMap(cubeName, segmentId)) + allRDDs[level] = allRDDs[level - 1].flatMapToPair(new CuboidFlatMap(cubeName, segmentId, metaUrl)) .reduceByKey(reducerFunction2, partition).persist(storageLevel); if (envConfig.isSparkSanityCheckEnabled() == true) { sanityCheck(allRDDs[level], totalCount, level, cubeStatsReader, countMeasureIndex); } - saveToHDFS(allRDDs[level], cubeName, cubeSegment, outputPath, level, confOverwrite); + saveToHDFS(allRDDs[level], cubeName, metaUrl, cubeSegment, outputPath, level, confOverwrite); allRDDs[level - 1].unpersist(); } allRDDs[totalLevels].unpersist(); @@ -231,7 +230,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa return partition; } - private void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, final String cubeName, + private void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, final String cubeName, final String metaUrl, final CubeSegment cubeSeg, final String hdfsBaseLocation, int level, Configuration conf) throws Exception { final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level); @@ -256,7 +255,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa if (!initialized) { synchronized (SparkCubingByLayer.class) { if (!initialized) { - KylinConfig kylinConfig = getKylinConfigForExecutor(); + KylinConfig kylinConfig = getKylinConfigForExecutor(metaUrl); CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName); codec = new BufferedMeasureCodec(desc.getMeasures()); initialized = true; @@ -278,10 +277,12 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa private BaseCuboidBuilder baseCuboidBuilder = null; private String cubeName; private String segmentId; + private String metaurl; - public EncodeBaseCuboid(String cubeName, String segmentId) { + public EncodeBaseCuboid(String cubeName, String segmentId, String metaurl) { this.cubeName = cubeName; this.segmentId = segmentId; + this.metaurl = metaurl; } @Override @@ -289,7 +290,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa if (initialized == false) { synchronized (SparkCubingByLayer.class) { if (initialized == false) { - KylinConfig kConfig = getKylinConfigForExecutor(); + KylinConfig kConfig = getKylinConfigForExecutor(metaurl); CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName); CubeDesc cubeDesc = cubeInstance.getDescriptor(); CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); @@ -327,17 +328,19 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa static class BaseCuboidReducerFunction2 implements Function2<Object[], Object[], Object[]> { protected String cubeName; + protected String metaUrl; protected CubeDesc cubeDesc; protected int measureNum; protected MeasureAggregators aggregators; protected volatile transient boolean initialized = false; - public BaseCuboidReducerFunction2(String cubeName) { + public BaseCuboidReducerFunction2(String cubeName, String metaUrl) { this.cubeName = cubeName; + this.metaUrl = metaUrl; } public void init() { - KylinConfig kConfig = getKylinConfigForExecutor(); + KylinConfig kConfig = getKylinConfigForExecutor(metaUrl); CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName); cubeDesc = cubeInstance.getDescriptor(); aggregators = new MeasureAggregators(cubeDesc.getMeasures()); @@ -363,8 +366,8 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa static class CuboidReducerFunction2 extends BaseCuboidReducerFunction2 { private boolean[] needAggr; - public CuboidReducerFunction2(String cubeName, boolean[] needAggr) { - super(cubeName); + public CuboidReducerFunction2(String cubeName, String metaUrl, boolean[] needAggr) { + super(cubeName, metaUrl); this.needAggr = needAggr; } @@ -390,6 +393,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa private String cubeName; private String segmentId; + private String metaUrl; private CubeSegment cubeSegment; private CubeDesc cubeDesc; private CuboidScheduler cuboidScheduler; @@ -397,13 +401,14 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa private RowKeySplitter rowKeySplitter; private volatile transient boolean initialized = false; - public CuboidFlatMap(String cubeName, String segmentId) { + public CuboidFlatMap(String cubeName, String segmentId, String metaUrl) { this.cubeName = cubeName; this.segmentId = segmentId; + this.metaUrl = metaUrl; } public void init() { - KylinConfig kConfig = getKylinConfigForExecutor(); + KylinConfig kConfig = getKylinConfigForExecutor(metaUrl); CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName); this.cubeSegment = cubeInstance.getSegmentById(segmentId); this.cubeDesc = cubeInstance.getDescriptor();