This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch engine-flink in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/engine-flink by this push: new 04394df KYLIN-3850 Flink cubing step : merge by layer 04394df is described below commit 04394df3b3a5bb664fbd66d323a7769d431692d8 Author: yanghua <yanghua1...@gmail.com> AuthorDate: Fri Mar 15 20:16:23 2019 +0800 KYLIN-3850 Flink cubing step : merge by layer --- .../kylin/engine/flink/FlinkCubingMerge.java | 309 +++++++++++++++++++++ 1 file changed, 309 insertions(+) diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java new file mode 100644 index 0000000..c51d69c --- /dev/null +++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.engine.flink; + +import com.google.common.collect.Lists; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.hadoopcompatibility.HadoopInputs; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.AbstractApplication; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.OptionsHelper; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.common.util.StringSplitter; +import org.apache.kylin.cube.CubeDescManager; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.engine.mr.BatchCubingJobBuilder2; +import org.apache.kylin.engine.mr.JobBuilderSupport; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.CubeStatsReader; +import org.apache.kylin.engine.mr.common.SerializableConfiguration; +import org.apache.kylin.engine.mr.steps.SegmentReEncoder; +import org.apache.kylin.measure.BufferedMeasureCodec; +import org.apache.kylin.measure.MeasureAggregators; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * Flink application to merge cube with the "by-layer" algorithm. Only support source data from Hive; Metadata in HBase. + */ +public class FlinkCubingMerge extends AbstractApplication implements Serializable { + + protected static final Logger logger = LoggerFactory.getLogger(FlinkCubingMerge.class); + + public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg() + .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME); + public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true) + .withDescription("Cube Segment Id").create("segmentId"); + public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true) + .withDescription("HDFS metadata url").create("metaUrl"); + public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg() + .isRequired(true).withDescription("HFile output path").create(BatchConstants.ARG_OUTPUT); + public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg() + .isRequired(true).withDescription("Cuboid files PATH").create(BatchConstants.ARG_INPUT); + + private Options options; + + private String cubeName; + private String metaUrl; + + public FlinkCubingMerge() { + options = new Options(); + options.addOption(OPTION_META_URL); + options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_SEGMENT_ID); + options.addOption(OPTION_INPUT_PATH); + options.addOption(OPTION_OUTPUT_PATH); + } + + @Override + protected Options getOptions() { + return options; + } + + @Override + protected void execute(OptionsHelper optionsHelper) throws Exception { + this.metaUrl = optionsHelper.getOptionValue(OPTION_META_URL); + this.cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME); + final String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH); + final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID); + final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH); + + Job job = Job.getInstance(); + FlinkUtil.modifyFlinkHadoopConfiguration(job); // set dfs.replication=2 and enable compress + + HadoopUtil.deletePath(job.getConfiguration(), new Path(outputPath)); + final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration()); + final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl); + + final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName); + final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName()); + final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); + final CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, envConfig); + + logger.info("Input path: {}", inputPath); + logger.info("Output path: {}", outputPath); + + FlinkUtil.setHadoopConfForCuboid(job, cubeSegment, metaUrl); + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + final MeasureAggregators aggregators = new MeasureAggregators(cubeDesc.getMeasures()); + + final int totalLevels = cubeSegment.getCuboidScheduler().getBuildLevel(); + final String[] inputFolders = StringSplitter.split(inputPath, ","); + FileSystem fs = HadoopUtil.getWorkingFileSystem(); + boolean isLegacyMode = false; + for (String inputFolder : inputFolders) { + Path baseCuboidPath = new Path(BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(inputFolder, 0)); + if (fs.exists(baseCuboidPath) == false) { + // doesn't exist sub folder, that means the merged cuboid in one folder (not by layer) + isLegacyMode = true; + break; + } + } + + if (isLegacyMode) { + // merge all layer's cuboid at once, this might be hard for Spark + List<DataSet<Tuple2<Text, Object[]>>> mergingSegs = Lists.newArrayListWithExpectedSize(inputFolders.length); + for (int i = 0; i < inputFolders.length; i++) { + String path = inputFolders[i]; + DataSet segRdd = FlinkUtil.parseInputPath(path, fs, env, Text.class, Text.class); + CubeSegment sourceSegment = findSourceSegment(path, cubeInstance); + // re-encode with new dictionaries + DataSet<Tuple2<Text, Object[]>> newEcoddedRdd = segRdd.map(new ReEncodeCuboidFunction(cubeName, + sourceSegment.getUuid(), cubeSegment.getUuid(), metaUrl, sConf)); + mergingSegs.add(newEcoddedRdd); + } + + FileOutputFormat.setOutputPath(job, new Path(outputPath)); + HadoopOutputFormat<Text, Text> hadoopOF = + new HadoopOutputFormat<>(new SequenceFileOutputFormat<>(), job); + + if (mergingSegs.size() > 0) { + DataSet unionedDataSet = mergingSegs.get(0); + for (int i = 1; i < mergingSegs.size(); i++) { + unionedDataSet = unionedDataSet.union(mergingSegs.get(i)); + } + + unionedDataSet + .groupBy(0) + .reduce(new MeasureReduceFunction(aggregators)) + .setParallelism(FlinkUtil.estimateTotalPartitionNum(cubeStatsReader, envConfig)) + .map(new ConvertTextMapFunction(sConf, metaUrl, cubeName)) + .output(hadoopOF); + } + } else { + // merge by layer + for (int level = 0; level <= totalLevels; level++) { + List<DataSet<Tuple2<Text, Object[]>>> mergingSegs = Lists.newArrayList(); + for (int i = 0; i < inputFolders.length; i++) { + String path = inputFolders[i]; + CubeSegment sourceSegment = findSourceSegment(path, cubeInstance); + final String cuboidInputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(path, level); + DataSet<Tuple2<Text, Text>> segRdd = env.createInput( + HadoopInputs.readSequenceFile(Text.class, Text.class, cuboidInputPath)); + // re-encode with new dictionaries + DataSet<Tuple2<Text, Object[]>> newEcoddedRdd = segRdd.map(new ReEncodeCuboidFunction(cubeName, + sourceSegment.getUuid(), cubeSegment.getUuid(), metaUrl, sConf)); + mergingSegs.add(newEcoddedRdd); + } + + final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(outputPath, level); + + Job jobInstanceForEachOutputFormat = Job.getInstance(); + FlinkUtil.modifyFlinkHadoopConfiguration(jobInstanceForEachOutputFormat); // set dfs.replication=2 and enable compress + FlinkUtil.setHadoopConfForCuboid(jobInstanceForEachOutputFormat, cubeSegment, metaUrl); + + FileOutputFormat.setOutputPath(jobInstanceForEachOutputFormat, new Path(cuboidOutputPath)); + HadoopOutputFormat<Text, Text> hadoopOF = + new HadoopOutputFormat<>(new SequenceFileOutputFormat<>(), jobInstanceForEachOutputFormat); + + if (mergingSegs.size() > 0) { + DataSet unionedDataSet = mergingSegs.get(0); + for (int i = 1; i < mergingSegs.size(); i++) { + unionedDataSet = unionedDataSet.union(mergingSegs.get(i)); + } + + unionedDataSet + .groupBy(0) + .reduce(new MeasureReduceFunction(aggregators)) + .setParallelism(FlinkUtil.estimateTotalPartitionNum(cubeStatsReader, envConfig)) + .map(new ConvertTextMapFunction(sConf, metaUrl, cubeName)) + .output(hadoopOF); + } + } + } + + env.execute("Merge segments for cube:" + cubeName + ", segment " + segmentId); + // output the data size to console, job engine will parse and save the metric + } + + private CubeSegment findSourceSegment(String filePath, CubeInstance cube) { + String jobID = JobBuilderSupport.extractJobIDFromPath(filePath); + return CubeInstance.findSegmentWithJobId(jobID, cube); + } + + private static class ReEncodeCuboidFunction extends RichMapFunction<Tuple2<Text, Text>, Tuple2<Text, Object[]>> { + private String cubeName; + private String sourceSegmentId; + private String mergedSegmentId; + private String metaUrl; + private SerializableConfiguration conf; + private transient KylinConfig kylinConfig; + private transient SegmentReEncoder segmentReEncoder = null; + + ReEncodeCuboidFunction(String cubeName, String sourceSegmentId, String mergedSegmentId, String metaUrl, + SerializableConfiguration conf) { + this.cubeName = cubeName; + this.sourceSegmentId = sourceSegmentId; + this.mergedSegmentId = mergedSegmentId; + this.metaUrl = metaUrl; + this.conf = conf; + } + + @Override + public void open(Configuration parameters) throws Exception { + this.kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl); + final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(cubeName); + final CubeDesc cubeDesc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cube.getDescName()); + final CubeSegment sourceSeg = cube.getSegmentById(sourceSegmentId); + final CubeSegment mergedSeg = cube.getSegmentById(mergedSegmentId); + this.segmentReEncoder = new SegmentReEncoder(cubeDesc, sourceSeg, mergedSeg, kylinConfig); + } + + @Override + public Tuple2<Text, Object[]> map(Tuple2<Text, Text> textTextTuple2) throws Exception { + Pair<Text, Object[]> encodedPair = segmentReEncoder.reEncode2(textTextTuple2.f0, textTextTuple2.f1); + return new Tuple2(encodedPair.getFirst(), encodedPair.getSecond()); + } + } + + private static class MeasureReduceFunction implements ReduceFunction<Object[]> { + + private MeasureAggregators aggregators; + + public MeasureReduceFunction(MeasureAggregators aggregators) { + this.aggregators = aggregators; + } + + @Override + public Object[] reduce(Object[] input1, Object[] input2) throws Exception { + Object[] measureObjs = new Object[input1.length]; + aggregators.aggregate(input1, input2, measureObjs); + return measureObjs; + } + } + + private static class ConvertTextMapFunction extends RichMapFunction<Tuple2<Text, Object[]>, Tuple2<Text, Text>> { + + private BufferedMeasureCodec codec; + private SerializableConfiguration sConf; + private String metaUrl; + private String cubeName; + + public ConvertTextMapFunction(SerializableConfiguration sConf, String metaUrl, String cubeName) { + this.sConf = sConf; + this.metaUrl = metaUrl; + this.cubeName = cubeName; + } + + @Override + public void open(Configuration parameters) throws Exception { + KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl); + try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig + .setAndUnsetThreadLocalConfig(kylinConfig)) { + CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName); + codec = new BufferedMeasureCodec(desc.getMeasures()); + } + } + + @Override + public Tuple2<Text, Text> map(Tuple2<Text, Object[]> tuple2) throws Exception { + ByteBuffer valueBuf = codec.encode(tuple2.f1); + byte[] encodedBytes = new byte[valueBuf.position()]; + System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position()); + return new Tuple2<>(tuple2.f0, new Text(encodedBytes)); + } + + } + +}