KYLIN-2623 Move output(Hbase) code to outputside
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a38b02df Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a38b02df Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a38b02df Branch: refs/heads/master Commit: a38b02df0387541684aa1689e044927af650f1c7 Parents: edc4d4c Author: Roger Shi <rogershijich...@hotmail.com> Authored: Tue May 16 13:50:23 2017 +0800 Committer: liyang-gmt8 <liy...@apache.org> Committed: Tue May 16 14:03:50 2017 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/engine/mr/IMROutput2.java | 33 +++++- .../engine/mr/common/AbstractHadoopJob.java | 6 +- .../apache/kylin/engine/mr/steps/CuboidJob.java | 37 +++---- .../kylin/engine/mr/steps/InMemCuboidJob.java | 55 ++-------- .../engine/mr/steps/LayerReducerNumSizing.java | 80 -------------- .../kylin/engine/mr/steps/MergeCuboidJob.java | 37 ++++--- .../engine/mr/steps/MergeCuboidMapper.java | 35 +----- .../kylin/engine/mr/steps/ReducerNumSizing.java | 106 +++++++++++++++++++ .../apache/kylin/source/hive/HiveMRInput.java | 2 - .../apache/kylin/source/kafka/KafkaMRInput.java | 1 - .../hbase/steps/HBaseMROutput2Transition.java | 97 +++++++++++++++++ 11 files changed, 284 insertions(+), 205 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/a38b02df/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java index 603f207..69bba0a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java @@ -18,11 +18,14 @@ package org.apache.kylin.engine.mr; +import java.util.List; + +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.job.execution.DefaultChainedExecutable; -import java.util.List; - public interface IMROutput2 { /** Return a helper to participate in batch cubing job flow. */ @@ -53,6 +56,19 @@ public interface IMROutput2 { /** Add step that does any necessary clean up. */ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow); + + public IMROutputFormat getOuputFormat(); + + } + + public interface IMROutputFormat { + + /** Configure the InputFormat of given job. */ + public void configureJobInput(Job job, String input) throws Exception; + + /** Configure the OutputFormat of given job. */ + public void configureJobOutput(Job job, String output, CubeSegment segment, int level) throws Exception; + } /** Return a helper to participate in batch merge job flow. */ @@ -82,6 +98,19 @@ public interface IMROutput2 { /** Add step that does any necessary clean up. */ public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow); + + public IMRMergeOutputFormat getOuputFormat(); + } + + public interface IMRMergeOutputFormat { + + /** Configure the InputFormat of given job. */ + public void configureJobInput(Job job, String input) throws Exception; + + /** Configure the OutputFormat of given job. */ + public void configureJobOutput(Job job, String output, CubeSegment segment) throws Exception; + + public CubeSegment findSourceSegment(FileSplit fileSplit, CubeInstance cube); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/a38b02df/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index 44686d6..764cbdd 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -559,7 +559,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { HadoopUtil.deletePath(conf, path); } - protected double getTotalMapInputMB() throws ClassNotFoundException, IOException, InterruptedException, JobException { + public static double getTotalMapInputMB(Job job) throws ClassNotFoundException, IOException, InterruptedException, JobException { if (job == null) { throw new JobException("Job is null"); } @@ -576,6 +576,10 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { return totalMapInputMB; } + protected double getTotalMapInputMB() throws ClassNotFoundException, IOException, InterruptedException, JobException { + return getTotalMapInputMB(job); + } + protected int getMapInputSplitCount() throws ClassNotFoundException, JobException, IOException, InterruptedException { if (job == null) { throw new JobException("Job is null"); http://git-wip-us.apache.org/repos/asf/kylin/blob/a38b02df/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java index b2e186d..6a8ba4c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java @@ -18,24 +18,19 @@ package org.apache.kylin.engine.mr.steps; -import java.io.IOException; - import org.apache.commons.cli.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat; +import org.apache.kylin.engine.mr.IMROutput2; import org.apache.kylin.engine.mr.MRUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; @@ -84,11 +79,10 @@ public class CuboidJob extends AbstractHadoopJob { options.addOption(OPTION_INPUT_PATH); options.addOption(OPTION_OUTPUT_PATH); options.addOption(OPTION_NCUBOID_LEVEL); - options.addOption(OPTION_INPUT_FORMAT); options.addOption(OPTION_CUBING_JOB_ID); parseOptions(options, args); - Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); + String output = getOptionValue(OPTION_OUTPUT_PATH); String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); int nCuboidLevel = Integer.parseInt(getOptionValue(OPTION_NCUBOID_LEVEL)); String segmentID = getOptionValue(OPTION_SEGMENT_ID); @@ -109,8 +103,10 @@ public class CuboidJob extends AbstractHadoopJob { setJobClasspath(job, cube.getConfig()); + // add metadata to distributed cache + attachSegmentMetadataWithDict(segment, job.getConfiguration()); + // Mapper - configureMapperInputFormat(segment); job.setMapperClass(this.mapperClass); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); @@ -118,22 +114,20 @@ public class CuboidJob extends AbstractHadoopJob { // Reducer job.setReducerClass(CuboidReducer.class); - job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); - FileOutputFormat.setOutputPath(job, output); + // set input + configureMapperInputFormat(segment); + + // set output + IMROutput2.IMROutputFormat outputFormat = MRUtil.getBatchCubingOutputSide2(segment).getOuputFormat(); + outputFormat.configureJobOutput(job, output, segment, nCuboidLevel); // set job configuration job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); job.getConfiguration().setInt(BatchConstants.CFG_CUBE_CUBOID_LEVEL, nCuboidLevel); - // add metadata to distributed cache - attachSegmentMetadataWithDict(segment, job.getConfiguration()); - - job.setNumReduceTasks(LayerReducerNumSizing.getReduceTaskNum(segment, getTotalMapInputMB(), nCuboidLevel)); - - this.deletePath(job.getConfiguration(), output); return waitForCompletion(job); } finally { @@ -142,7 +136,7 @@ public class CuboidJob extends AbstractHadoopJob { } } - private void configureMapperInputFormat(CubeSegment cubeSeg) throws IOException { + private void configureMapperInputFormat(CubeSegment cubeSeg) throws Exception { String input = getOptionValue(OPTION_INPUT_PATH); if ("FLAT_TABLE".equals(input)) { @@ -151,12 +145,9 @@ public class CuboidJob extends AbstractHadoopJob { flatTableInputFormat.configureJob(job); } else { // n-dimension cuboid case + IMROutput2.IMROutputFormat outputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getOuputFormat(); + outputFormat.configureJobInput(job, input); FileInputFormat.setInputPaths(job, new Path(input)); - if (hasOption(OPTION_INPUT_FORMAT) && ("textinputformat".equalsIgnoreCase(getOptionValue(OPTION_INPUT_FORMAT)))) { - job.setInputFormatClass(TextInputFormat.class); - } else { - job.setInputFormatClass(SequenceFileInputFormat.class); - } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/a38b02df/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java index 7706bac..73a2eb9 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java @@ -18,29 +18,21 @@ package org.apache.kylin.engine.mr.steps; -import java.io.IOException; -import java.util.Map; - import org.apache.commons.cli.Options; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Reducer.Context; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.ByteArrayWritable; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat; +import org.apache.kylin.engine.mr.IMROutput2; import org.apache.kylin.engine.mr.MRUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.engine.mr.common.CubeStatsReader; import org.apache.kylin.job.execution.ExecutableManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -107,28 +99,24 @@ public class InMemCuboidJob extends AbstractHadoopJob { job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); - // set input - IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(segment).getFlatTableInputFormat(); - flatTableInputFormat.configureJob(job); - // set mapper job.setMapperClass(InMemCuboidMapper.class); job.setMapOutputKeyClass(ByteArrayWritable.class); job.setMapOutputValueClass(ByteArrayWritable.class); - // set output - job.setReducerClass(InMemCuboidReducer.class); - job.setNumReduceTasks(calculateReducerNum(segment)); - + // set reducer // the cuboid file and KV class must be compatible with 0.7 version for smooth upgrade - job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setReducerClass(InMemCuboidReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); - Path outputPath = new Path(output); - FileOutputFormat.setOutputPath(job, outputPath); + // set input + IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(segment).getFlatTableInputFormat(); + flatTableInputFormat.configureJob(job); - HadoopUtil.deletePath(job.getConfiguration(), outputPath); + // set output + IMROutput2.IMROutputFormat outputFormat = MRUtil.getBatchCubingOutputSide2(segment).getOuputFormat(); + outputFormat.configureJobOutput(job, output, segment, 0); return waitForCompletion(job); } finally { @@ -137,31 +125,6 @@ public class InMemCuboidJob extends AbstractHadoopJob { } } - private int calculateReducerNum(CubeSegment cubeSeg) throws IOException { - KylinConfig kylinConfig = cubeSeg.getConfig(); - - Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, kylinConfig).getCuboidSizeMap(); - double totalSizeInM = 0; - for (Double cuboidSize : cubeSizeMap.values()) { - totalSizeInM += cuboidSize; - } - - double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB(); - - // number of reduce tasks - int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB); - - // at least 1 reducer by default - numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks); - // no more than 500 reducer by default - numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks); - - logger.info("Having total map input MB " + Math.round(totalSizeInM)); - logger.info("Having per reduce MB " + perReduceInputMB); - logger.info("Setting " + Context.NUM_REDUCES + "=" + numReduceTasks); - return numReduceTasks; - } - public static void main(String[] args) throws Exception { InMemCuboidJob job = new InMemCuboidJob(); int exitCode = ToolRunner.run(job, args); http://git-wip-us.apache.org/repos/asf/kylin/blob/a38b02df/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReducerNumSizing.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReducerNumSizing.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReducerNumSizing.java deleted file mode 100644 index 7ce9842..0000000 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReducerNumSizing.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.engine.mr.steps; - -import java.io.IOException; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.engine.mr.common.CubeStatsReader; -import org.apache.kylin.job.exception.JobException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class LayerReducerNumSizing { - - private static final Logger logger = LoggerFactory.getLogger(LayerReducerNumSizing.class); - - public static int getReduceTaskNum(CubeSegment cubeSegment, double totalMapInputMB, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException { - CubeDesc cubeDesc = cubeSegment.getCubeDesc(); - KylinConfig kylinConfig = cubeDesc.getConfig(); - - double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB(); - double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio(); - logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level " + level); - - CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, kylinConfig); - - double parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst; - - if (level == -1) { - //merge case - double estimatedSize = cubeStatsReader.estimateCubeSize(); - adjustedCurrentLayerSizeEst = estimatedSize > totalMapInputMB ? totalMapInputMB : estimatedSize; - logger.info("estimated size {}, input size {}, adjustedCurrentLayerSizeEst: {}", estimatedSize, totalMapInputMB, adjustedCurrentLayerSizeEst); - } else if (level == 0) { - //base cuboid case TODO: the estimation could be very WRONG because it has no correction - adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0); - logger.info("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst); - } else { - parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1); - currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level); - adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst; - logger.info("totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}", totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst); - } - - // number of reduce tasks - int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / perReduceInputMB * reduceCountRatio + 0.99); - - // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance - if (cubeDesc.hasMemoryHungryMeasures()) { - logger.info("Multiply reducer num by 4 to boost performance for memory hungry measures"); - numReduceTasks = numReduceTasks * 4; - } - - // at least 1 reducer by default - numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks); - // no more than 500 reducer by default - numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks); - - return numReduceTasks; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/a38b02df/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java index 84b76e3..63d0619 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java @@ -19,15 +19,14 @@ package org.apache.kylin.engine.mr.steps; import org.apache.commons.cli.Options; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.IMROutput2; +import org.apache.kylin.engine.mr.MRUtil; import org.apache.kylin.engine.mr.common.BatchConstants; public class MergeCuboidJob extends CuboidJob { @@ -44,11 +43,14 @@ public class MergeCuboidJob extends CuboidJob { options.addOption(OPTION_OUTPUT_PATH); parseOptions(options, args); + String input = getOptionValue(OPTION_INPUT_PATH); + String output = getOptionValue(OPTION_OUTPUT_PATH); String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); String segmentID = getOptionValue(OPTION_SEGMENT_ID); CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); CubeInstance cube = cubeMgr.getCube(cubeName); + CubeSegment cubeSeg = cube.getSegmentById(segmentID); // start job String jobName = getOptionValue(OPTION_JOB_NAME); @@ -57,35 +59,32 @@ public class MergeCuboidJob extends CuboidJob { setJobClasspath(job, cube.getConfig()); - // set inputs - addInputDirs(getOptionValue(OPTION_INPUT_PATH), job); - - Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); - FileOutputFormat.setOutputPath(job, output); + // add metadata to distributed cache + // TODO actually only dictionaries from merging segments are needed + attachCubeMetadataWithDict(cube, job.getConfiguration()); // Mapper - job.setInputFormatClass(SequenceFileInputFormat.class); job.setMapperClass(MergeCuboidMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); + // Reducer job.setReducerClass(CuboidReducer.class); - job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); + // set inputs + IMROutput2.IMRMergeOutputFormat outputFormat = MRUtil.getBatchMergeOutputSide2(cubeSeg).getOuputFormat(); + outputFormat.configureJobInput(job, input); + addInputDirs(input, job); + + // set output + outputFormat.configureJobOutput(job, output, cubeSeg); + // set job configuration job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); - // add metadata to distributed cache - // TODO actually only dictionaries from merging segments are needed - attachCubeMetadataWithDict(cube, job.getConfiguration()); - - job.setNumReduceTasks(LayerReducerNumSizing.getReduceTaskNum(cube.getSegmentById(segmentID), getTotalMapInputMB(), -1)); - - this.deletePath(job.getConfiguration(), output); - return waitForCompletion(job); } finally { if (job != null) http://git-wip-us.apache.org/repos/asf/kylin/blob/a38b02df/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java index fccd48a..a603fc8 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java @@ -23,8 +23,6 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileSplit; @@ -44,7 +42,9 @@ import org.apache.kylin.cube.kv.RowKeyEncoder; import org.apache.kylin.cube.kv.RowKeyEncoderProvider; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.dict.DictionaryManager; +import org.apache.kylin.engine.mr.IMROutput2; import org.apache.kylin.engine.mr.KylinMapper; +import org.apache.kylin.engine.mr.MRUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.measure.BufferedMeasureCodec; @@ -110,7 +110,8 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { // decide which source segment FileSplit fileSplit = (FileSplit) context.getInputSplit(); - sourceCubeSegment = findSourceSegment(fileSplit, cube); + IMROutput2.IMRMergeOutputFormat outputFormat = MRUtil.getBatchMergeOutputSide2(mergedCubeSegment).getOuputFormat(); + sourceCubeSegment = outputFormat.findSourceSegment(fileSplit, cube); rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255); rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment); @@ -146,34 +147,6 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { } } - private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"); - - public CubeSegment findSourceSegment(FileSplit fileSplit, CubeInstance cube) { - String filePath = fileSplit.getPath().toString(); - String jobID = extractJobIDFromPath(filePath); - return findSegmentWithUuid(jobID, cube); - } - - private static String extractJobIDFromPath(String path) { - Matcher matcher = JOB_NAME_PATTERN.matcher(path); - // check the first occurrence - if (matcher.find()) { - return matcher.group(1); - } else { - throw new IllegalStateException("Can not extract job ID from file path : " + path); - } - } - - private static CubeSegment findSegmentWithUuid(String jobID, CubeInstance cubeInstance) { - for (CubeSegment segment : cubeInstance.getSegments()) { - String lastBuildJobID = segment.getLastBuildJobID(); - if (lastBuildJobID != null && lastBuildJobID.equalsIgnoreCase(jobID)) { - return segment; - } - } - throw new IllegalStateException("No merging segment's last build job ID equals " + jobID); - } - @Override public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException { long cuboidID = rowKeySplitter.split(key.getBytes()); http://git-wip-us.apache.org/repos/asf/kylin/blob/a38b02df/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java new file mode 100644 index 0000000..eb1adad --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.steps; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.engine.mr.common.CubeStatsReader; +import org.apache.kylin.job.exception.JobException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReducerNumSizing { + + private static final Logger logger = LoggerFactory.getLogger(ReducerNumSizing.class); + + public static int getLayeredCubingReduceTaskNum(CubeSegment cubeSegment, double totalMapInputMB, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException { + CubeDesc cubeDesc = cubeSegment.getCubeDesc(); + KylinConfig kylinConfig = cubeDesc.getConfig(); + + double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB(); + double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio(); + logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level " + level); + + CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, kylinConfig); + + double parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst; + + if (level == -1) { + //merge case + double estimatedSize = cubeStatsReader.estimateCubeSize(); + adjustedCurrentLayerSizeEst = estimatedSize > totalMapInputMB ? totalMapInputMB : estimatedSize; + logger.info("estimated size {}, input size {}, adjustedCurrentLayerSizeEst: {}", estimatedSize, totalMapInputMB, adjustedCurrentLayerSizeEst); + } else if (level == 0) { + //base cuboid case TODO: the estimation could be very WRONG because it has no correction + adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0); + logger.info("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst); + } else { + parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1); + currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level); + adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst; + logger.info("totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}", totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst); + } + + // number of reduce tasks + int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / perReduceInputMB * reduceCountRatio + 0.99); + + // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance + if (cubeDesc.hasMemoryHungryMeasures()) { + logger.info("Multiply reducer num by 4 to boost performance for memory hungry measures"); + numReduceTasks = numReduceTasks * 4; + } + + // at least 1 reducer by default + numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks); + // no more than 500 reducer by default + numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks); + + return numReduceTasks; + } + + public static int getInmemCubingReduceTaskNum(CubeSegment cubeSeg) throws IOException { + KylinConfig kylinConfig = cubeSeg.getConfig(); + + Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, kylinConfig).getCuboidSizeMap(); + double totalSizeInM = 0; + for (Double cuboidSize : cubeSizeMap.values()) { + totalSizeInM += cuboidSize; + } + + double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB(); + + // number of reduce tasks + int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB); + + // at least 1 reducer by default + numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks); + // no more than 500 reducer by default + numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks); + + logger.info("Having total map input MB " + Math.round(totalSizeInM)); + logger.info("Having per reduce MB " + perReduceInputMB); + logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks); + return numReduceTasks; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/a38b02df/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index d7a2c7e..6c542ab 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -112,8 +112,6 @@ public class HiveMRInput implements IMRInput { HCatInputFormat.setInput(job, dbName, tableName); job.setInputFormatClass(HCatInputFormat.class); - - job.setMapOutputValueClass(org.apache.hive.hcatalog.data.DefaultHCatRecord.class); } catch (IOException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/kylin/blob/a38b02df/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java index 500e1e9..4c140be 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java @@ -117,7 +117,6 @@ public class KafkaMRInput implements IMRInput { @Override public void configureJob(Job job) { job.setInputFormatClass(SequenceFileInputFormat.class); - job.setMapOutputValueClass(Text.class); String jobId = job.getConfiguration().get(BatchConstants.ARG_CUBING_JOB_ID); IJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(cubeSegment); String inputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId)); http://git-wip-us.apache.org/repos/asf/kylin/blob/a38b02df/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java index c4df354..31cb189 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java @@ -19,10 +19,25 @@ package org.apache.kylin.storage.hbase.steps; import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.IMROutput2; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.steps.HiveToBaseCuboidMapper; +import org.apache.kylin.engine.mr.steps.InMemCuboidMapper; import org.apache.kylin.engine.mr.steps.MergeCuboidJob; +import org.apache.kylin.engine.mr.steps.NDCuboidMapper; +import org.apache.kylin.engine.mr.steps.ReducerNumSizing; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,9 +77,38 @@ public class HBaseMROutput2Transition implements IMROutput2 { public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) { // nothing to do } + + @Override + public IMROutputFormat getOuputFormat() { + return new HBaseMROutputFormat(); + } }; } + public static class HBaseMROutputFormat implements IMROutputFormat { + + @Override + public void configureJobInput(Job job, String input) throws Exception { + job.setInputFormatClass(SequenceFileInputFormat.class); + } + + @Override + public void configureJobOutput(Job job, String output, CubeSegment segment, int level) throws Exception { + int reducerNum = 1; + Class mapperClass = job.getMapperClass(); + if (mapperClass == HiveToBaseCuboidMapper.class || mapperClass == NDCuboidMapper.class) { + reducerNum = ReducerNumSizing.getLayeredCubingReduceTaskNum(segment, AbstractHadoopJob.getTotalMapInputMB(job), level); + } else if (mapperClass == InMemCuboidMapper.class) { + reducerNum = ReducerNumSizing.getInmemCubingReduceTaskNum(segment); + } + Path outputPath = new Path(output); + FileOutputFormat.setOutputPath(job, outputPath); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setNumReduceTasks(reducerNum); + HadoopUtil.deletePath(job.getConfiguration(), outputPath); + } + } + @Override public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(final CubeSegment seg) { return new IMRBatchMergeOutputSide2() { @@ -86,6 +130,59 @@ public class HBaseMROutput2Transition implements IMROutput2 { public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) { steps.addMergingGarbageCollectionSteps(jobFlow); } + + @Override + public IMRMergeOutputFormat getOuputFormat() { + return new HBaseMergeMROutputFormat(); + } }; } + + public static class HBaseMergeMROutputFormat implements IMRMergeOutputFormat{ + + private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"); + + @Override + public void configureJobInput(Job job, String input) throws Exception { + job.setInputFormatClass(SequenceFileInputFormat.class); + } + + @Override + public void configureJobOutput(Job job, String output, CubeSegment segment) throws Exception { + int reducerNum = ReducerNumSizing.getLayeredCubingReduceTaskNum(segment, AbstractHadoopJob.getTotalMapInputMB(job), -1); + job.setNumReduceTasks(reducerNum); + + Path outputPath = new Path(output); + HadoopUtil.deletePath(job.getConfiguration(), outputPath); + FileOutputFormat.setOutputPath(job, outputPath); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + } + + @Override + public CubeSegment findSourceSegment(FileSplit fileSplit, CubeInstance cube) { + String filePath = fileSplit.getPath().toString(); + String jobID = extractJobIDFromPath(filePath); + return findSegmentWithUuid(jobID, cube); + } + + private static String extractJobIDFromPath(String path) { + Matcher matcher = JOB_NAME_PATTERN.matcher(path); + // check the first occurrence + if (matcher.find()) { + return matcher.group(1); + } else { + throw new IllegalStateException("Can not extract job ID from file path : " + path); + } + } + + private static CubeSegment findSegmentWithUuid(String jobID, CubeInstance cubeInstance) { + for (CubeSegment segment : cubeInstance.getSegments()) { + String lastBuildJobID = segment.getLastBuildJobID(); + if (lastBuildJobID != null && lastBuildJobID.equalsIgnoreCase(jobID)) { + return segment; + } + } + throw new IllegalStateException("No merging segment's last build job ID equals " + jobID); + } + } } \ No newline at end of file