Repository: kylin Updated Branches: refs/heads/master 9265e150d -> c6cfa6984
KYLIN-2995 Set SparkContext.hadoopConfiguration to HadoopUtil in Spark Cuing Signed-off-by: shaofengshi <shaofeng...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c6cfa698 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c6cfa698 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c6cfa698 Branch: refs/heads/master Commit: c6cfa69841f96f6d3f411e375fbe1779e819cd84 Parents: 9265e15 Author: kangkaisen <kangkai...@meituan.com> Authored: Mon Dec 4 20:42:22 2017 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Thu Dec 7 13:25:27 2017 +0800 ---------------------------------------------------------------------- .../engine/mr/common/AbstractHadoopJob.java | 4 +- .../mr/common/SerializableConfiguration.java | 50 ++++++++++++++++++++ .../kylin/engine/spark/SparkCubingByLayer.java | 37 +++++++++------ 3 files changed, 76 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/c6cfa698/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index 6e67488..ade07e9 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -468,7 +468,9 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { } } - public static KylinConfig loadKylinConfigFromHdfs(String uri) { + public static KylinConfig loadKylinConfigFromHdfs(SerializableConfiguration conf, String uri) { + HadoopUtil.setCurrentConfiguration(conf.get()); + if (uri == null) throw new IllegalArgumentException("meta url should not be null"); http://git-wip-us.apache.org/repos/asf/kylin/blob/c6cfa698/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/SerializableConfiguration.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/SerializableConfiguration.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/SerializableConfiguration.java new file mode 100644 index 0000000..b390432 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/SerializableConfiguration.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.common; + +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; +import java.io.Serializable; + +//https://stackoverflow.com/questions/38224132/use-sparkcontext-hadoop-configuration-within-rdd-methods-closures-like-foreachp +public class SerializableConfiguration implements Serializable { + Configuration conf; + + public SerializableConfiguration(Configuration hadoopConf) { + this.conf = hadoopConf; + } + + public SerializableConfiguration() { + this.conf = new Configuration(); + } + + public Configuration get() { + return this.conf; + } + + private void writeObject(java.io.ObjectOutputStream out) throws IOException { + this.conf.write(out); + } + + private void readObject(java.io.ObjectInputStream in) throws IOException { + this.conf = new Configuration(); + this.conf.readFields(in); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c6cfa698/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java index f7c5fee..0d26815 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java @@ -59,6 +59,7 @@ import org.apache.kylin.engine.mr.common.BaseCuboidBuilder; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.CubeStatsReader; import org.apache.kylin.engine.mr.common.NDCuboidBuilder; +import org.apache.kylin.engine.mr.common.SerializableConfiguration; import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.measure.MeasureAggregators; import org.apache.kylin.measure.MeasureIngester; @@ -132,8 +133,9 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa JavaSparkContext sc = new JavaSparkContext(conf); HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath)); + final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration()); - KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl); + KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl); final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName); final CubeDesc cubeDesc = cubeInstance.getDescriptor(); @@ -169,17 +171,17 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa // encode with dimension encoding, transform to <ByteArray, Object[]> RDD final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = intermediateTable.javaRDD() - .mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl)); + .mapToPair(new EncodeBaseCuboid(cubeName, segmentId, metaUrl, sConf)); Long totalCount = 0L; if (envConfig.isSparkSanityCheckEnabled()) { totalCount = encodedBaseRDD.count(); } - final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new BaseCuboidReducerFunction2(cubeName, metaUrl); + final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new BaseCuboidReducerFunction2(cubeName, metaUrl, sConf); BaseCuboidReducerFunction2 reducerFunction2 = baseCuboidReducerFunction; if (allNormalMeasure == false) { - reducerFunction2 = new CuboidReducerFunction2(cubeName, metaUrl, needAggr); + reducerFunction2 = new CuboidReducerFunction2(cubeName, metaUrl, sConf, needAggr); } final int totalLevels = CuboidUtil.getLongestDepth(cubeSegment.getCuboidScheduler().getAllCuboidIds()); @@ -195,7 +197,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa // aggregate to ND cuboids for (level = 1; level <= totalLevels; level++) { partition = estimateRDDPartitionNum(level, cubeStatsReader, envConfig); - allRDDs[level] = allRDDs[level - 1].flatMapToPair(new CuboidFlatMap(cubeName, segmentId, metaUrl)) + allRDDs[level] = allRDDs[level - 1].flatMapToPair(new CuboidFlatMap(cubeName, segmentId, metaUrl, sConf)) .reduceByKey(reducerFunction2, partition).persist(storageLevel); if (envConfig.isSparkSanityCheckEnabled() == true) { sanityCheck(allRDDs[level], totalCount, level, cubeStatsReader, countMeasureIndex); @@ -232,6 +234,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa final CubeSegment cubeSeg, final String hdfsBaseLocation, final int level, final Job job, final KylinConfig kylinConfig) throws Exception { final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level); + final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration()); IMROutput2.IMROutputFormat outputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getOuputFormat(); outputFormat.configureJobOutput(job, cuboidOutputPath, cubeSeg, cubeSeg.getCuboidScheduler(), level); @@ -248,7 +251,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa if (initialized == false) { synchronized (SparkCubingByLayer.class) { if (initialized == false) { - KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl); + KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl); CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName); codec = new BufferedMeasureCodec(desc.getMeasures()); initialized = true; @@ -272,11 +275,13 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa private String cubeName; private String segmentId; private String metaUrl; + private SerializableConfiguration conf; - public EncodeBaseCuboid(String cubeName, String segmentId, String metaurl) { + public EncodeBaseCuboid(String cubeName, String segmentId, String metaurl, SerializableConfiguration conf) { this.cubeName = cubeName; this.segmentId = segmentId; this.metaUrl = metaurl; + this.conf = conf; } @Override @@ -284,7 +289,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa if (initialized == false) { synchronized (SparkCubingByLayer.class) { if (initialized == false) { - KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl); + KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl); CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName); CubeDesc cubeDesc = cubeInstance.getDescriptor(); CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); @@ -327,14 +332,16 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa protected int measureNum; protected MeasureAggregators aggregators; protected volatile transient boolean initialized = false; + protected SerializableConfiguration conf; - public BaseCuboidReducerFunction2(String cubeName, String metaUrl) { + public BaseCuboidReducerFunction2(String cubeName, String metaUrl, SerializableConfiguration conf) { this.cubeName = cubeName; this.metaUrl = metaUrl; + this.conf = conf; } public void init() { - KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl); + KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl); CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName); cubeDesc = cubeInstance.getDescriptor(); aggregators = new MeasureAggregators(cubeDesc.getMeasures()); @@ -360,8 +367,8 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa static public class CuboidReducerFunction2 extends BaseCuboidReducerFunction2 { private boolean[] needAggr; - public CuboidReducerFunction2(String cubeName, String metaUrl, boolean[] needAggr) { - super(cubeName, metaUrl); + public CuboidReducerFunction2(String cubeName, String metaUrl, SerializableConfiguration conf, boolean[] needAggr) { + super(cubeName, metaUrl, conf); this.needAggr = needAggr; } @@ -394,15 +401,17 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa private NDCuboidBuilder ndCuboidBuilder; private RowKeySplitter rowKeySplitter; private volatile transient boolean initialized = false; + private SerializableConfiguration conf; - public CuboidFlatMap(String cubeName, String segmentId, String metaUrl) { + public CuboidFlatMap(String cubeName, String segmentId, String metaUrl, SerializableConfiguration conf) { this.cubeName = cubeName; this.segmentId = segmentId; this.metaUrl = metaUrl; + this.conf = conf; } public void init() { - KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl); + KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl); CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName); this.cubeSegment = cubeInstance.getSegmentById(segmentId); this.cubeDesc = cubeInstance.getDescriptor();