Rename sparkcubingv3 class name and add unpersistent
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0a648fbc Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0a648fbc Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0a648fbc Branch: refs/heads/sparkcubing-rebase Commit: 0a648fbc3ce2dfe142179205fc972764b5d1ab40 Parents: f4db4b5 Author: shaofengshi <shaofeng...@apache.org> Authored: Sun Dec 25 20:28:43 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Sun Dec 25 20:28:43 2016 +0800 ---------------------------------------------------------------------- .../spark/SparkBatchCubingJobBuilder2.java | 12 +- .../kylin/engine/spark/SparkCubingByLayer.java | 360 +++++++++++++++++++ .../kylin/engine/spark/SparkCubingV3.java | 354 ------------------ 3 files changed, 366 insertions(+), 360 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/0a648fbc/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java index 57b6432..9532d31 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java @@ -48,12 +48,12 @@ public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 { protected AbstractExecutable createInMemCubingStep(String jobId, String cuboidRootPath) { IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg); final SparkExecutable sparkExecutable = new SparkExecutable(); - sparkExecutable.setClassName(SparkCubingV3.class.getName()); - sparkExecutable.setParam(SparkCubingV3.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName()); - sparkExecutable.setParam(SparkCubingV3.OPTION_SEGMENT_ID.getOpt(), seg.getUuid()); - sparkExecutable.setParam(SparkCubingV3.OPTION_INPUT_PATH.getOpt(), flatTableDesc.getTableName()); - sparkExecutable.setParam(SparkCubingV3.OPTION_CONF_PATH.getOpt(), "/Users/shishaofeng/workspace/kylin-15/examples/test_case_data/sandbox/"); //FIXME - sparkExecutable.setParam(SparkCubingV3.OPTION_OUTPUT_PATH.getOpt(), cuboidRootPath); + sparkExecutable.setClassName(SparkCubingByLayer.class.getName()); + sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName()); + sparkExecutable.setParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt(), seg.getUuid()); + sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_PATH.getOpt(), flatTableDesc.getTableName()); + sparkExecutable.setParam(SparkCubingByLayer.OPTION_CONF_PATH.getOpt(), "/Users/shishaofeng/workspace/kylin-15/examples/test_case_data/sandbox/"); //FIXME + sparkExecutable.setParam(SparkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(), cuboidRootPath); StringBuilder jars = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/kylin/blob/0a648fbc/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java new file mode 100644 index 0000000..7467e86 --- /dev/null +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.engine.spark; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.AbstractApplication; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.OptionsHelper; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.common.RowKeySplitter; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.cuboid.CuboidScheduler; +import org.apache.kylin.cube.kv.AbstractRowKeyEncoder; +import org.apache.kylin.cube.kv.RowKeyEncoderProvider; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; +import org.apache.kylin.engine.EngineFactory; +import org.apache.kylin.engine.mr.BatchCubingJobBuilder2; +import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.engine.mr.common.BaseCuboidBuilder; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.NDCuboidBuilder; +import org.apache.kylin.measure.BufferedMeasureCodec; +import org.apache.kylin.measure.MeasureAggregators; +import org.apache.kylin.measure.MeasureIngester; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkFiles; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFlatMapFunction; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.hive.HiveContext; +import org.apache.spark.storage.StorageLevel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; + +import static org.apache.kylin.engine.spark.SparkCubing.getKyroClasses; + +/** + */ +public class SparkCubingByLayer extends AbstractApplication implements Serializable { + + protected static final Logger logger = LoggerFactory.getLogger(SparkCubing.class); + + public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable"); + public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME); + public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube Segment Id").create("segmentId"); + public static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true).withDescription("Configuration Path").create("confPath"); + public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg().isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT); + + private Options options; + + public SparkCubingByLayer() { + options = new Options(); + options.addOption(OPTION_INPUT_PATH); + options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_SEGMENT_ID); + options.addOption(OPTION_CONF_PATH); + options.addOption(OPTION_OUTPUT_PATH); + } + + @Override + protected Options getOptions() { + return options; + } + + private void setupClasspath(JavaSparkContext sc, String confPath) throws Exception { + ClassUtil.addClasspath(confPath); + final File[] files = new File(confPath).listFiles(new FileFilter() { + @Override + public boolean accept(File pathname) { + if (pathname.getAbsolutePath().endsWith(".xml")) { + return true; + } + if (pathname.getAbsolutePath().endsWith(".properties")) { + return true; + } + return false; + } + }); + for (File file : files) { + sc.addFile(file.getAbsolutePath()); + } + } + + + private static final void prepare() { + final File file = new File(SparkFiles.get("kylin.properties")); + final String confPath = file.getParentFile().getAbsolutePath(); + logger.info("conf directory:" + confPath); + System.setProperty(KylinConfig.KYLIN_CONF, confPath); + ClassUtil.addClasspath(confPath); + } + + + @Override + protected void execute(OptionsHelper optionsHelper) throws Exception { + final String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_PATH); + final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME); + final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID); + final String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH); + final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH); + + SparkConf conf = new SparkConf().setAppName("Cubing Application"); + //serialization conf + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.set("spark.kryo.registrationRequired", "true"); + final Iterable<String> allClasses = Iterables.filter(Iterables.concat(Lists.newArrayList(conf.get("spark.kryo.classesToRegister", "").split(",")), getKyroClasses()), new Predicate<String>() { + @Override + public boolean apply(@Nullable String input) { + return input != null && input.trim().length() > 0; + } + }); + conf.set("spark.kryo.classesToRegister", StringUtils.join(allClasses, ",")); + + JavaSparkContext sc = new JavaSparkContext(conf); + setupClasspath(sc, confPath); + HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath)); + + HiveContext sqlContext = new HiveContext(sc.sc()); + final DataFrame intermediateTable = sqlContext.sql("select * from " + hiveTable); + + System.setProperty(KylinConfig.KYLIN_CONF, confPath); + final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName); + final CubeDesc cubeDesc = cubeInstance.getDescriptor(); + final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); + final CubeJoinedFlatTableEnrich intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc); + + final Broadcast<CubeDesc> vCubeDesc = sc.broadcast(cubeDesc); + final Broadcast<CubeSegment> vCubeSegment = sc.broadcast(cubeSegment); + final Broadcast<BufferedMeasureCodec> vCodec = sc.broadcast(new BufferedMeasureCodec(cubeDesc.getMeasures())); + NDCuboidBuilder ndCuboidBuilder = new NDCuboidBuilder(vCubeSegment.getValue(), new RowKeyEncoderProvider(cubeSegment)); + + final Broadcast<NDCuboidBuilder> vNDCuboidBuilder = sc.broadcast(ndCuboidBuilder); + final Broadcast<CuboidScheduler> vCuboidScheduler = sc.broadcast(new CuboidScheduler(vCubeDesc.getValue())); + + final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); + final Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); + final int measureNum = cubeDesc.getMeasures().size(); + final BaseCuboidBuilder baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, vCubeDesc.getValue(), vCubeSegment.getValue(), intermediateTableDesc, + AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid), + vCodec.getValue(), MeasureIngester.create(cubeDesc.getMeasures()), cubeSegment.buildDictionaryMap()); + + boolean[] needAggr = new boolean[cubeDesc.getMeasures().size()]; + boolean allNormalMeasure = true; + for (int i = 0; i < cubeDesc.getMeasures().size(); i++) { + needAggr[i] = !cubeDesc.getMeasures().get(i).getFunction().getMeasureType().onlyAggrInBaseCuboid(); + allNormalMeasure = allNormalMeasure && needAggr[i]; + } + logger.info("All measure are normal (agg on all cuboids) ? : " + allNormalMeasure); + + StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK_SER(); + + // encode with dimension encoding, transform to <byte[], Object[]> RDD + final JavaPairRDD<byte[], Object[]> encodedBaseRDD = intermediateTable.javaRDD().mapToPair(new PairFunction<Row, byte[], Object[]>() { + @Override + public Tuple2<byte[], Object[]> call(Row row) throws Exception { + String[] rowArray = rowToArray(row); + baseCuboidBuilder.resetAggrs(); + byte[] rowKey = baseCuboidBuilder.buildKey(rowArray); + Object[] result = baseCuboidBuilder.buildValueObjects(rowArray); + return new Tuple2<>(rowKey, result); + } + + private String[] rowToArray(Row row) { + String[] result = new String[row.size()]; + for (int i = 0; i < row.size(); i++) { + final Object o = row.get(i); + if (o != null) { + result[i] = o.toString(); + } else { + result[i] = null; + } + } + return result; + } + + }); + + + final CuboidReducerFunction2 reducerFunction2 = new CuboidReducerFunction2(measureNum, vCubeDesc.getValue(), vCodec.getValue()); + CuboidReducerFunction2 baseCuboidReducerFunction = reducerFunction2; + if (allNormalMeasure == false) { + baseCuboidReducerFunction = new BaseCuboidReducerFunction2(measureNum, vCubeDesc.getValue(), vCodec.getValue(), needAggr); + } + + // aggregate to calculate base cuboid + final JavaPairRDD<byte[], Object[]> baseCuboidRDD = encodedBaseRDD.reduceByKey(baseCuboidReducerFunction).persist(storageLevel); + persistent(baseCuboidRDD, vCodec.getValue(), outputPath, 0, sc.hadoopConfiguration()); + + // aggregate to ND cuboids + final int totalLevels = cubeDesc.getBuildLevel(); + + JavaPairRDD<byte[], Object[]> fatherRDD = baseCuboidRDD; + for (int level = 1; level <= totalLevels; level++) { + JavaPairRDD<byte[], Object[]> childRDD = fatherRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<byte[], Object[]>, byte[], Object[]>() { + + transient boolean initialized = false; + + RowKeySplitter rowKeySplitter = new RowKeySplitter(vCubeSegment.getValue(), 65, 256); + + @Override + public Iterable<Tuple2<byte[], Object[]>> call(Tuple2<byte[], Object[]> tuple2) throws Exception { + if (initialized == false) { + prepare(); + initialized = true; + } + + List<Tuple2<byte[], Object[]>> tuples = Lists.newArrayList(); + byte[] key = tuple2._1(); + long cuboidId = rowKeySplitter.split(key); + Cuboid parentCuboid = Cuboid.findById(vCubeDesc.getValue(), cuboidId); + + Collection<Long> myChildren = vCuboidScheduler.getValue().getSpanningCuboid(cuboidId); + + // if still empty or null + if (myChildren == null || myChildren.size() == 0) { + return tuples; + } + + for (Long child : myChildren) { + Cuboid childCuboid = Cuboid.findById(vCubeDesc.getValue(), child); + Pair<Integer, ByteArray> result = vNDCuboidBuilder.getValue().buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers()); + + byte[] newKey = new byte[result.getFirst()]; + System.arraycopy(result.getSecond().array(), 0, newKey, 0, result.getFirst()); + + tuples.add(new Tuple2<>(newKey, tuple2._2())); + } + + return tuples; + } + }).reduceByKey(reducerFunction2).persist(storageLevel); + + // persistent rdd to hdfs + persistent(childRDD, vCodec.getValue(), outputPath, level, sc.hadoopConfiguration()); + fatherRDD.unpersist(); + fatherRDD = childRDD; + } + + fatherRDD.unpersist(); + logger.info("Finished on calculating all level cuboids."); + + } + + private void persistent(final JavaPairRDD<byte[], Object[]> rdd, final BufferedMeasureCodec codec, final String hdfsBaseLocation, int level, Configuration conf) { + final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level); + final JavaPairRDD<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> serializedRDD = rdd.mapToPair(new PairFunction<Tuple2<byte[], Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() { + @Override + public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> call(Tuple2<byte[], Object[]> tuple2) throws Exception { + ByteBuffer valueBuf = codec.encode(tuple2._2()); + byte[] encodedBytes = new byte[valueBuf.position()]; + System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position()); + return new Tuple2<>(new org.apache.hadoop.io.Text(tuple2._1()), new org.apache.hadoop.io.Text(encodedBytes)); + } + }); + logger.debug("Persisting RDD for level " + level + " into " + cuboidOutputPath); + serializedRDD.saveAsNewAPIHadoopFile(cuboidOutputPath, org.apache.hadoop.io.Text.class, org.apache.hadoop.io.Text.class, SequenceFileOutputFormat.class, conf); + logger.debug("Done: persisting RDD for level " + level); + serializedRDD.unpersist(); + } + + class CuboidReducerFunction2 implements Function2<Object[], Object[], Object[]> { + BufferedMeasureCodec codec; + CubeDesc cubeDesc; + int measureNum; + transient ThreadLocal<MeasureAggregators> current = new ThreadLocal<>(); + + CuboidReducerFunction2(int measureNum, CubeDesc cubeDesc, BufferedMeasureCodec codec) { + this.codec = codec; + this.cubeDesc = cubeDesc; + this.measureNum = measureNum; + } + + @Override + public Object[] call(Object[] input1, Object[] input2) throws Exception { + if (current.get() == null) { + current.set(new MeasureAggregators(cubeDesc.getMeasures())); + } + Object[] result = new Object[measureNum]; + current.get().reset(); + current.get().aggregate(input1); + current.get().aggregate(input2); + current.get().collectStates(result); + return result; + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + current = new ThreadLocal(); + } + } + + class BaseCuboidReducerFunction2 extends CuboidReducerFunction2 { + boolean[] needAggr; + + BaseCuboidReducerFunction2(int measureNum, CubeDesc cubeDesc, BufferedMeasureCodec codec, boolean[] needAggr) { + super(measureNum, cubeDesc, codec); + this.needAggr = needAggr; + } + + @Override + public Object[] call(Object[] input1, Object[] input2) throws Exception { + if (current.get() == null) { + current.set(new MeasureAggregators(cubeDesc.getMeasures())); + } + current.get().reset(); + Object[] result = new Object[measureNum]; + current.get().aggregate(input1, needAggr); + current.get().aggregate(input2, needAggr); + current.get().collectStates(result); + return result; + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/0a648fbc/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingV3.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingV3.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingV3.java deleted file mode 100644 index 6f2915a..0000000 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingV3.java +++ /dev/null @@ -1,354 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.engine.spark; - -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.AbstractApplication; -import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.ClassUtil; -import org.apache.kylin.common.util.OptionsHelper; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.common.RowKeySplitter; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.cuboid.CuboidScheduler; -import org.apache.kylin.cube.kv.AbstractRowKeyEncoder; -import org.apache.kylin.cube.kv.RowKeyEncoderProvider; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; -import org.apache.kylin.engine.EngineFactory; -import org.apache.kylin.engine.mr.BatchCubingJobBuilder2; -import org.apache.kylin.engine.mr.HadoopUtil; -import org.apache.kylin.engine.mr.common.BaseCuboidBuilder; -import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.engine.mr.common.NDCuboidBuilder; -import org.apache.kylin.measure.BufferedMeasureCodec; -import org.apache.kylin.measure.MeasureAggregators; -import org.apache.kylin.measure.MeasureIngester; -import org.apache.spark.SparkConf; -import org.apache.spark.SparkFiles; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFlatMapFunction; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.broadcast.Broadcast; -import org.apache.spark.sql.DataFrame; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.hive.HiveContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Tuple2; - -import javax.annotation.Nullable; -import java.io.File; -import java.io.FileFilter; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.Serializable; -import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.List; - -import static org.apache.kylin.engine.spark.SparkCubing.getKyroClasses; - -/** - */ -public class SparkCubingV3 extends AbstractApplication implements Serializable { - - protected static final Logger logger = LoggerFactory.getLogger(SparkCubing.class); - - public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable"); - public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME); - public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube Segment Id").create("segmentId"); - public static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true).withDescription("Configuration Path").create("confPath"); - public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg().isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT); - - private Options options; - - public SparkCubingV3() { - options = new Options(); - options.addOption(OPTION_INPUT_PATH); - options.addOption(OPTION_CUBE_NAME); - options.addOption(OPTION_SEGMENT_ID); - options.addOption(OPTION_CONF_PATH); - options.addOption(OPTION_OUTPUT_PATH); - } - - @Override - protected Options getOptions() { - return options; - } - - private void setupClasspath(JavaSparkContext sc, String confPath) throws Exception { - ClassUtil.addClasspath(confPath); - final File[] files = new File(confPath).listFiles(new FileFilter() { - @Override - public boolean accept(File pathname) { - if (pathname.getAbsolutePath().endsWith(".xml")) { - return true; - } - if (pathname.getAbsolutePath().endsWith(".properties")) { - return true; - } - return false; - } - }); - for (File file : files) { - sc.addFile(file.getAbsolutePath()); - } - } - - - private static final void prepare() { - final File file = new File(SparkFiles.get("kylin.properties")); - final String confPath = file.getParentFile().getAbsolutePath(); - System.out.println("conf directory:" + confPath); - System.setProperty(KylinConfig.KYLIN_CONF, confPath); - ClassUtil.addClasspath(confPath); - } - - - @Override - protected void execute(OptionsHelper optionsHelper) throws Exception { - final String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_PATH); - final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME); - final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID); - final String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH); - final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH); - - SparkConf conf = new SparkConf().setAppName("Cubing Application"); - //serialization conf - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.set("spark.kryo.registrationRequired", "true"); - final Iterable<String> allClasses = Iterables.filter(Iterables.concat(Lists.newArrayList(conf.get("spark.kryo.classesToRegister", "").split(",")), getKyroClasses()), new Predicate<String>() { - @Override - public boolean apply(@Nullable String input) { - return input != null && input.trim().length() > 0; - } - }); - conf.set("spark.kryo.classesToRegister", StringUtils.join(allClasses, ",")); - - JavaSparkContext sc = new JavaSparkContext(conf); - setupClasspath(sc, confPath); - HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath)); - - HiveContext sqlContext = new HiveContext(sc.sc()); - final DataFrame intermediateTable = sqlContext.sql("select * from " + hiveTable); - - System.setProperty(KylinConfig.KYLIN_CONF, confPath); - final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName); - final CubeDesc cubeDesc = cubeInstance.getDescriptor(); - final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); - final CubeJoinedFlatTableEnrich intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc); - - final Broadcast<CubeDesc> vCubeDesc = sc.broadcast(cubeDesc); - final Broadcast<CubeSegment> vCubeSegment = sc.broadcast(cubeSegment); - final Broadcast<BufferedMeasureCodec> vCodec = sc.broadcast(new BufferedMeasureCodec(cubeDesc.getMeasures())); - NDCuboidBuilder ndCuboidBuilder = new NDCuboidBuilder(vCubeSegment.getValue(), new RowKeyEncoderProvider(cubeSegment)); - - final Broadcast<NDCuboidBuilder> vNDCuboidBuilder = sc.broadcast(ndCuboidBuilder); - final Broadcast<CuboidScheduler> vCuboidScheduler = sc.broadcast(new CuboidScheduler(vCubeDesc.getValue())); - - final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); - final Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); - final int measureNum = cubeDesc.getMeasures().size(); - final BaseCuboidBuilder baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, vCubeDesc.getValue(), vCubeSegment.getValue(), intermediateTableDesc, - AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid), - vCodec.getValue(), MeasureIngester.create(cubeDesc.getMeasures()), cubeSegment.buildDictionaryMap()); - - boolean[] needAggr = new boolean[cubeDesc.getMeasures().size()]; - boolean allNormalMeasure = true; - for (int i = 0; i < cubeDesc.getMeasures().size(); i++) { - needAggr[i] = !cubeDesc.getMeasures().get(i).getFunction().getMeasureType().onlyAggrInBaseCuboid(); - allNormalMeasure = allNormalMeasure && needAggr[i]; - } - logger.info("All measure are normal (agg on all cuboids) ? : " + allNormalMeasure); - - // encode with dimension encoding, transform to <byte[], Object[]> RDD - final JavaPairRDD<byte[], Object[]> encodedBaseRDD = intermediateTable.javaRDD().mapToPair(new PairFunction<Row, byte[], Object[]>() { - @Override - public Tuple2<byte[], Object[]> call(Row row) throws Exception { - String[] rowArray = rowToArray(row); - baseCuboidBuilder.resetAggrs(); - byte[] rowKey = baseCuboidBuilder.buildKey(rowArray); - Object[] result = baseCuboidBuilder.buildValueObjects(rowArray); - return new Tuple2<>(rowKey, result); - } - - private String[] rowToArray(Row row) { - String[] result = new String[row.size()]; - for (int i = 0; i < row.size(); i++) { - final Object o = row.get(i); - if (o != null) { - result[i] = o.toString(); - } else { - result[i] = null; - } - } - return result; - } - - }); - - - final CuboidReducerFunction2 reducerFunction2 = new CuboidReducerFunction2(measureNum, vCubeDesc.getValue(), vCodec.getValue()); - CuboidReducerFunction2 baseCuboidReducerFunction = reducerFunction2; - if (allNormalMeasure == false) { - baseCuboidReducerFunction = new BaseCuboidReducerFunction2(measureNum, vCubeDesc.getValue(), vCodec.getValue(), needAggr); - } - - // aggregate to calculate base cuboid - final JavaPairRDD<byte[], Object[]> baseCuboidRDD = encodedBaseRDD.reduceByKey(baseCuboidReducerFunction); - persistent(baseCuboidRDD, vCodec.getValue(), outputPath, 0, sc.hadoopConfiguration()); - - // aggregate to ND cuboids - final int totalLevels = cubeDesc.getBuildLevel(); - - JavaPairRDD<byte[], Object[]> parentRDD = baseCuboidRDD; - for (int level = 1; level <= totalLevels; level++) { - JavaPairRDD<byte[], Object[]> childRDD = parentRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<byte[], Object[]>, byte[], Object[]>() { - - transient boolean initialized = false; - - RowKeySplitter rowKeySplitter = new RowKeySplitter(vCubeSegment.getValue(), 65, 256); - - @Override - public Iterable<Tuple2<byte[], Object[]>> call(Tuple2<byte[], Object[]> tuple2) throws Exception { - if (initialized == false) { - prepare(); - initialized = true; - } - - List<Tuple2<byte[], Object[]>> tuples = Lists.newArrayList(); - byte[] key = tuple2._1(); - long cuboidId = rowKeySplitter.split(key); - Cuboid parentCuboid = Cuboid.findById(vCubeDesc.getValue(), cuboidId); - - Collection<Long> myChildren = vCuboidScheduler.getValue().getSpanningCuboid(cuboidId); - - // if still empty or null - if (myChildren == null || myChildren.size() == 0) { - return tuples; - } - - for (Long child : myChildren) { - Cuboid childCuboid = Cuboid.findById(vCubeDesc.getValue(), child); - Pair<Integer, ByteArray> result = vNDCuboidBuilder.getValue().buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers()); - - byte[] newKey = new byte[result.getFirst()]; - System.arraycopy(result.getSecond().array(), 0, newKey, 0, result.getFirst()); - - tuples.add(new Tuple2<>(newKey, tuple2._2())); - } - - return tuples; - } - }).reduceByKey(reducerFunction2); - - // persistent rdd to hdfs - persistent(childRDD, vCodec.getValue(), outputPath, level, sc.hadoopConfiguration()); - parentRDD = childRDD; - } - - logger.info("Finished on calculating all level cuboids."); - - } - - private void persistent(final JavaPairRDD<byte[], Object[]> rdd, final BufferedMeasureCodec codec, final String hdfsBaseLocation, int level, Configuration conf) { - final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level); - final JavaPairRDD<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> serializedRDD = rdd.mapToPair(new PairFunction<Tuple2<byte[], Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() { - @Override - public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> call(Tuple2<byte[], Object[]> tuple2) throws Exception { - ByteBuffer valueBuf = codec.encode(tuple2._2()); - byte[] encodedBytes = new byte[valueBuf.position()]; - System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position()); - return new Tuple2<>(new org.apache.hadoop.io.Text(tuple2._1()), new org.apache.hadoop.io.Text(encodedBytes)); - } - }); - logger.debug("Persisting RDD for level " + level + " into " + cuboidOutputPath); - serializedRDD.saveAsNewAPIHadoopFile(cuboidOutputPath, org.apache.hadoop.io.Text.class, org.apache.hadoop.io.Text.class, SequenceFileOutputFormat.class, conf); - logger.debug("Done: persisting RDD for level " + level); - } - - class CuboidReducerFunction2 implements Function2<Object[], Object[], Object[]> { - BufferedMeasureCodec codec; - CubeDesc cubeDesc; - int measureNum; - transient ThreadLocal<MeasureAggregators> current = new ThreadLocal<>(); - - CuboidReducerFunction2(int measureNum, CubeDesc cubeDesc, BufferedMeasureCodec codec) { - this.codec = codec; - this.cubeDesc = cubeDesc; - this.measureNum = measureNum; - } - - @Override - public Object[] call(Object[] input1, Object[] input2) throws Exception { - if (current.get() == null) { - current.set(new MeasureAggregators(cubeDesc.getMeasures())); - } - Object[] result = new Object[measureNum]; - current.get().reset(); - current.get().aggregate(input1); - current.get().aggregate(input2); - current.get().collectStates(result); - return result; - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - current = new ThreadLocal(); - } - } - - class BaseCuboidReducerFunction2 extends CuboidReducerFunction2 { - boolean[] needAggr; - - BaseCuboidReducerFunction2(int measureNum, CubeDesc cubeDesc, BufferedMeasureCodec codec, boolean[] needAggr) { - super(measureNum, cubeDesc, codec); - this.needAggr = needAggr; - } - - @Override - public Object[] call(Object[] input1, Object[] input2) throws Exception { - if (current.get() == null) { - current.set(new MeasureAggregators(cubeDesc.getMeasures())); - } - current.get().reset(); - Object[] result = new Object[measureNum]; - current.get().aggregate(input1, needAggr); - current.get().aggregate(input2, needAggr); - current.get().collectStates(result); - return result; - } - } -}