APACHE-KYLIN-2866: Enlarge the reducer number for hyperloglog statistics calculation at step CalculateStatsFromBaseCuboidJob
Signed-off-by: lidongsjtu <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5425deba Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5425deba Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5425deba Branch: refs/heads/master Commit: 5425debae98f6397bf05f5459416a57d8feb14da Parents: 377ec49 Author: Zhong <[email protected]> Authored: Thu Sep 28 19:12:07 2017 +0800 Committer: lidongsjtu <[email protected]> Committed: Wed Dec 20 23:20:11 2017 +0800 ---------------------------------------------------------------------- .../kylin/engine/mr/common/CubeStatsWriter.java | 25 +++++++-- .../steps/CalculateStatsFromBaseCuboidJob.java | 10 +++- ...CalculateStatsFromBaseCuboidPartitioner.java | 59 ++++++++++++++++++++ .../CalculateStatsFromBaseCuboidReducer.java | 7 ++- .../mr/steps/MergeStatisticsWithOldStep.java | 29 +++++++--- 5 files changed, 111 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/5425deba/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java index 8f400c3..b1e59a7 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java @@ -20,7 +20,6 @@ package org.apache.kylin.engine.mr.common; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -35,6 +34,8 @@ import org.apache.kylin.common.util.Bytes; import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.measure.hllc.HLLCounter; +import com.google.common.collect.Lists; + public class CubeStatsWriter { public static void writeCuboidStatistics(Configuration conf, Path outputPath, // @@ -45,17 +46,32 @@ public class CubeStatsWriter { public static void writeCuboidStatistics(Configuration conf, Path outputPath, // Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio) throws IOException { Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); + writeCuboidStatisticsInner(conf, seqFilePath, cuboidHLLMap, samplingPercentage, mapperNumber, + mapperOverlapRatio); + } + + public static void writePartialCuboidStatistics(Configuration conf, Path outputPath, // + Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio, + int shard) throws IOException { + Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME + "_" + shard); + writeCuboidStatisticsInner(conf, seqFilePath, cuboidHLLMap, samplingPercentage, mapperNumber, + mapperOverlapRatio); + } - List<Long> allCuboids = new ArrayList<Long>(); + private static void writeCuboidStatisticsInner(Configuration conf, Path outputFilePath, // + Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio) + throws IOException { + List<Long> allCuboids = Lists.newArrayList(); allCuboids.addAll(cuboidHLLMap.keySet()); Collections.sort(allCuboids); ByteBuffer valueBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE); - SequenceFile.Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(seqFilePath), SequenceFile.Writer.keyClass(LongWritable.class), SequenceFile.Writer.valueClass(BytesWritable.class)); + SequenceFile.Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(outputFilePath), + SequenceFile.Writer.keyClass(LongWritable.class), SequenceFile.Writer.valueClass(BytesWritable.class)); try { // mapper overlap ratio at key -1 writer.append(new LongWritable(-1), new BytesWritable(Bytes.toBytes(mapperOverlapRatio))); - + // mapper number at key -2 writer.append(new LongWritable(-2), new BytesWritable(Bytes.toBytes(mapperNumber))); @@ -72,5 +88,4 @@ public class CubeStatsWriter { IOUtils.closeQuietly(writer); } } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/5425deba/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java index b60076c..8f64272 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java @@ -35,6 +35,7 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.MapReduceUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,7 +78,7 @@ public class CalculateStatsFromBaseCuboidJob extends AbstractHadoopJob { setJobClasspath(job, cube.getConfig()); setupMapper(input); - setupReducer(output, 1); + setupReducer(output, cubeSegment); attachSegmentMetadataWithDict(cubeSegment, job.getConfiguration()); @@ -101,12 +102,15 @@ public class CalculateStatsFromBaseCuboidJob extends AbstractHadoopJob { job.setMapOutputValueClass(Text.class); } - private void setupReducer(Path output, int numberOfReducers) throws IOException { + private void setupReducer(Path output, CubeSegment cubeSeg) throws IOException { + int hllShardBase = MapReduceUtil.getHLLShardBase(cubeSeg); + job.getConfiguration().setInt(BatchConstants.CFG_HLL_SHARD_BASE, hllShardBase); + job.setReducerClass(CalculateStatsFromBaseCuboidReducer.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); - job.setNumReduceTasks(numberOfReducers); + job.setNumReduceTasks(hllShardBase); FileOutputFormat.setOutputPath(job, output); job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString()); http://git-wip-us.apache.org/repos/asf/kylin/blob/5425deba/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidPartitioner.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidPartitioner.java new file mode 100644 index 0000000..70db21b --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidPartitioner.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.steps; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +public class CalculateStatsFromBaseCuboidPartitioner extends Partitioner<Text, Text> implements Configurable { + private static final Logger logger = LoggerFactory.getLogger(CalculateStatsFromBaseCuboidPartitioner.class); + + private Configuration conf; + private int hllShardBase = 1; + + @Override + public int getPartition(Text key, Text value, int numReduceTasks) { + Long cuboidId = Bytes.toLong(key.getBytes()); + int shard = cuboidId.hashCode() % hllShardBase; + if (shard < 0) { + shard += hllShardBase; + } + return numReduceTasks - shard - 1; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + hllShardBase = conf.getInt(BatchConstants.CFG_HLL_SHARD_BASE, 1); + logger.info("shard base for hll is " + hllShardBase); + } + + @Override + public Configuration getConf() { + return conf; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/5425deba/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidReducer.java index 489dac4..7210622 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidReducer.java @@ -55,6 +55,8 @@ public class CalculateStatsFromBaseCuboidReducer extends KylinReducer<Text, Text private String output = null; private int samplingPercentage; + private int taskId; + @Override protected void doSetup(Context context) throws IOException { super.bindCurrentConfiguration(context.getConfiguration()); @@ -72,6 +74,7 @@ public class CalculateStatsFromBaseCuboidReducer extends KylinReducer<Text, Text samplingPercentage = Integer .parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); + taskId = context.getTaskAttemptID().getTaskID().getId(); cuboidHLLMap = Maps.newHashMap(); } @@ -106,7 +109,7 @@ public class CalculateStatsFromBaseCuboidReducer extends KylinReducer<Text, Text } double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal; - CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(output), // - cuboidHLLMap, samplingPercentage, baseCuboidRowCountInMappers.size(), mapperOverlapRatio); + CubeStatsWriter.writePartialCuboidStatistics(context.getConfiguration(), new Path(output), // + cuboidHLLMap, samplingPercentage, baseCuboidRowCountInMappers.size(), mapperOverlapRatio, taskId); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/5425deba/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java index e97c6bb..7855c06 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java @@ -63,7 +63,6 @@ public class MergeStatisticsWithOldStep extends AbstractExecutable { final CubeManager mgr = CubeManager.getInstance(context.getConfig()); final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams())); final CubeSegment optimizeSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); - final String statsInputPath = CubingExecutableUtil.getStatisticsPath(this.getParams()); CubeSegment oldSegment = optimizeSegment.getCubeInstance().getOriginalSegmentToOptimize(optimizeSegment); Preconditions.checkNotNull(oldSegment, @@ -76,16 +75,28 @@ public class MergeStatisticsWithOldStep extends AbstractExecutable { try { //1. Add statistics from optimized segment - Path statisticsFilePath = new Path(statsInputPath, - BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); + Path statisticsDirPath = new Path(CubingExecutableUtil.getStatisticsPath(this.getParams())); FileSystem hdfs = FileSystem.get(conf); - if (!hdfs.exists(statisticsFilePath)) - throw new IOException("File " + statisticsFilePath + " does not exists"); + if (!hdfs.exists(statisticsDirPath)) { + throw new IOException("StatisticsFilePath " + statisticsDirPath + " does not exists"); + } + + if (!hdfs.isDirectory(statisticsDirPath)) { + throw new IOException("StatisticsFilePath " + statisticsDirPath + " is not a directory"); + } - CubeStatsReader optimizeSegmentStatsReader = new CubeStatsReader(optimizeSegment, null, - optimizeSegment.getConfig(), statisticsFilePath); - averageSamplingPercentage += optimizeSegmentStatsReader.getSamplingPercentage(); - addFromCubeStatsReader(optimizeSegmentStatsReader); + Path[] statisticsFiles = HadoopUtil.getFilterPath(hdfs, statisticsDirPath, + BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); + if (statisticsFiles == null) { + throw new IOException("fail to find the statistics file in base dir: " + statisticsDirPath); + } + + for (Path item : statisticsFiles) { + CubeStatsReader optimizeSegmentStatsReader = new CubeStatsReader(optimizeSegment, null, + optimizeSegment.getConfig(), item); + averageSamplingPercentage += optimizeSegmentStatsReader.getSamplingPercentage(); + addFromCubeStatsReader(optimizeSegmentStatsReader); + } //2. Add statistics from old segment CubeStatsReader oldSegmentStatsReader = new CubeStatsReader(oldSegment, null, oldSegment.getConfig());
