minor improvement on logging for shard/reducer sizing temp
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/020cf5f0 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/020cf5f0 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/020cf5f0 Branch: refs/heads/master-cdh5.7 Commit: 020cf5f00ebeb3514493652629e408906ac1786e Parents: e2e2a81 Author: Hongbin Ma <mahong...@apache.org> Authored: Thu Dec 29 14:13:36 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Thu Dec 29 18:51:50 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/engine/mr/steps/CuboidJob.java | 2 +- .../kylin/engine/mr/steps/CuboidReducer.java | 2 +- .../engine/mr/steps/InMemCuboidReducer.java | 2 +- .../engine/mr/steps/LayerReducerNumSizing.java | 84 ++++++++++++++++++++ .../engine/mr/steps/LayerReduerNumSizing.java | 82 ------------------- .../kylin/engine/mr/steps/MergeCuboidJob.java | 2 +- 6 files changed, 88 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/020cf5f0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java index bd305c1..ef25b55 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java @@ -131,7 +131,7 @@ public class CuboidJob extends AbstractHadoopJob { // add metadata to distributed cache attachSegmentMetadataWithDict(segment, job.getConfiguration()); - LayerReduerNumSizing.setReduceTaskNum(job, segment, getTotalMapInputMB(), nCuboidLevel); + LayerReducerNumSizing.setReduceTaskNum(job, segment, getTotalMapInputMB(), nCuboidLevel); this.deletePath(job.getConfiguration(), output); http://git-wip-us.apache.org/repos/asf/kylin/blob/020cf5f0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java index b1d4aaa..afd29e3 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java @@ -91,7 +91,7 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> { for (Text value : values) { if (vcounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { - logger.info("Handling value with ordinal: " + vcounter); + logger.info("Handling value with ordinal (This is not KV number!): " + vcounter); } codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), input); if (cuboidLevel > 0) { http://git-wip-us.apache.org/repos/asf/kylin/blob/020cf5f0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java index 04c9e90..244889f 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java @@ -80,7 +80,7 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra for (ByteArrayWritable value : values) { if (vcounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { - logger.info("Handling value with ordinal: " + vcounter); + logger.info("Handling value with ordinal (This is not KV number!): " + vcounter); } codec.decode(value.asBuffer(), input); aggs.aggregate(input); http://git-wip-us.apache.org/repos/asf/kylin/blob/020cf5f0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReducerNumSizing.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReducerNumSizing.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReducerNumSizing.java new file mode 100644 index 0000000..713a95c --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReducerNumSizing.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.steps; + +import java.io.IOException; + +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.engine.mr.common.CubeStatsReader; +import org.apache.kylin.job.exception.JobException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LayerReducerNumSizing { + + private static final Logger logger = LoggerFactory.getLogger(LayerReducerNumSizing.class); + + public static void setReduceTaskNum(Job job, CubeSegment cubeSegment, double totalMapInputMB, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException { + CubeDesc cubeDesc = cubeSegment.getCubeDesc(); + KylinConfig kylinConfig = cubeDesc.getConfig(); + + double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB(); + double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio(); + logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level " + level); + + CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, kylinConfig); + + double parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst; + + if (level == -1) { + //merge case + double estimatedSize = cubeStatsReader.estimateCubeSize(); + adjustedCurrentLayerSizeEst = estimatedSize > totalMapInputMB ? totalMapInputMB : estimatedSize; + logger.info("estimated size {}, input size {}, adjustedCurrentLayerSizeEst: {}", estimatedSize, totalMapInputMB, adjustedCurrentLayerSizeEst); + } else if (level == 0) { + //base cuboid case TODO: the estimation could be very WRONG because it has no correction + adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0); + logger.info("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst); + } else { + parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1); + currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level); + adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst; + logger.info("totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}", totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst); + } + + // number of reduce tasks + int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / perReduceInputMB * reduceCountRatio + 0.99); + + // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance + if (cubeDesc.hasMemoryHungryMeasures()) { + logger.info("Multiply reducer num by 4 to boost performance for memory hungry measures"); + numReduceTasks = numReduceTasks * 4; + } + + // at least 1 reducer by default + numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks); + // no more than 500 reducer by default + numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks); + + job.setNumReduceTasks(numReduceTasks); + + logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/020cf5f0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReduerNumSizing.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReduerNumSizing.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReduerNumSizing.java deleted file mode 100644 index 6bddcbd..0000000 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReduerNumSizing.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.engine.mr.steps; - -import java.io.IOException; - -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.engine.mr.common.CubeStatsReader; -import org.apache.kylin.job.exception.JobException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class LayerReduerNumSizing { - - private static final Logger logger = LoggerFactory.getLogger(LayerReduerNumSizing.class); - - public static void setReduceTaskNum(Job job, CubeSegment cubeSegment, double totalMapInputMB, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException { - CubeDesc cubeDesc = cubeSegment.getCubeDesc(); - KylinConfig kylinConfig = cubeDesc.getConfig(); - - double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB(); - double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio(); - logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level " + level); - - CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, kylinConfig); - - double parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst; - - if (level == -1) { - //merge case - adjustedCurrentLayerSizeEst = cubeStatsReader.estimateCubeSize(); - logger.info("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst); - } else if (level == 0) { - //base cuboid case - adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0); - logger.info("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst); - } else { - parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1); - currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level); - adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst; - logger.info("totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}", totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst); - } - - // number of reduce tasks - int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / perReduceInputMB * reduceCountRatio); - - // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance - if (cubeDesc.hasMemoryHungryMeasures()) { - numReduceTasks = numReduceTasks * 4; - } - - // at least 1 reducer by default - numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks); - // no more than 500 reducer by default - numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks); - - job.setNumReduceTasks(numReduceTasks); - - logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/020cf5f0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java index 012e19f..d9ff616 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java @@ -82,7 +82,7 @@ public class MergeCuboidJob extends CuboidJob { // TODO actually only dictionaries from merging segments are needed attachCubeMetadataWithDict(cube, job.getConfiguration()); - LayerReduerNumSizing.setReduceTaskNum(job, cube.getSegmentById(segmentID), getTotalMapInputMB(), -1); + LayerReducerNumSizing.setReduceTaskNum(job, cube.getSegmentById(segmentID), getTotalMapInputMB(), -1); this.deletePath(job.getConfiguration(), output);