APACHE-KYLIN-2866: Enlarge the reducer number for hyperloglog statistics calculation at step FactDistinctColumnsJob
Signed-off-by: Zhong <[email protected]> Signed-off-by: lidongsjtu <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/377ec491 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/377ec491 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/377ec491 Branch: refs/heads/master Commit: 377ec491c66e36a217012547fa5ea9b00ae361cc Parents: a8f35cf Author: Wang Ken <[email protected]> Authored: Wed Sep 13 11:36:02 2017 +0800 Committer: lidongsjtu <[email protected]> Committed: Wed Dec 20 23:20:11 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 8 ++ .../apache/kylin/common/util/HadoopUtil.java | 19 ++++ .../mr/steps/FactDistinctColumnPartitioner.java | 38 ++++++- .../engine/mr/steps/FactDistinctColumnsJob.java | 20 +++- .../mr/steps/FactDistinctColumnsReducer.java | 55 ++++++---- .../engine/mr/steps/SaveStatisticsStep.java | 107 ++++++++++++++++++- 6 files changed, 215 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/377ec491/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 524b1d4..23a2120 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1040,6 +1040,14 @@ abstract public class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.engine.mr.cuboid-number-per-stats-calculator", "100")); } + public int getFactDistinctJobPerReducerHLLCuboidNumber() { + return Integer.parseInt(getOptional("kylin.engine.mr.fact-distinct-per-reducer-hll-cuboid-number", "100")); + } + + public int getFactDistinctJobHLLMaxReducerNumber() { + return Integer.parseInt(getOptional("kylin.engine.mr.fact-distinct-hll-max-reducer-number", "50")); + } + //UHC: ultra high cardinality columns, contain the ShardByColumns and the GlobalDictionaryColumns public int getUHCReducerCount() { return Integer.parseInt(getOptional("kylin.engine.mr.uhc-reducer-count", "5")); http://git-wip-us.apache.org/repos/asf/kylin/blob/377ec491/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java index f242515..cafcaf2 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java @@ -160,4 +160,23 @@ public class HadoopUtil { return null; } } + + public static Path[] getFilterPath(FileSystem fs, Path baseDir, final String filter) throws IOException { + if (fs.exists(baseDir) == false) { + return null; + } + + FileStatus[] fileStatus = fs.listStatus(baseDir, new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().startsWith(filter); + } + }); + + Path[] result = new Path[fileStatus.length]; + for (int i = 0; i < fileStatus.length; i++) { + result[i] = fileStatus[i].getPath(); + } + return result; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/377ec491/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java index 5fcfe42..7ac5d02 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java @@ -18,25 +18,55 @@ package org.apache.kylin.engine.mr.steps; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** */ -public class FactDistinctColumnPartitioner extends Partitioner<SelfDefineSortableKey, Text> { +public class FactDistinctColumnPartitioner extends Partitioner<SelfDefineSortableKey, Text> implements Configurable { + private static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnPartitioner.class); + + public static final String HLL_SHARD_BASE_PROPERTY_NAME = "mapreduce.partition.factdistinctcolumnpartitioner.hll.shard.base"; + + public static void setHLLShard(Configuration conf, int hllShardBase) { + conf.setInt(HLL_SHARD_BASE_PROPERTY_NAME, hllShardBase); + } + + private Configuration conf; + private int hllShardBase = 1; @Override public int getPartition(SelfDefineSortableKey skey, Text value, int numReduceTasks) { Text key = skey.getText(); if (key.getBytes()[0] == FactDistinctColumnsMapper.MARK_FOR_HLL) { - // the last reducer is for merging hll - return numReduceTasks - 1; + // the last $hllShard reducers are for merging hll + Long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG); + int shard = cuboidId.hashCode() % hllShardBase; + if (shard < 0) { + shard += hllShardBase; + } + return numReduceTasks - shard - 1; } else if (key.getBytes()[0] == FactDistinctColumnsMapper.MARK_FOR_PARTITION_COL) { // the last but one reducer is for partition col - return numReduceTasks - 2; + return numReduceTasks - hllShardBase - 1; } else { return BytesUtil.readUnsigned(key.getBytes(), 0, 1); } } + + public void setConf(Configuration conf) { + this.conf = conf; + hllShardBase = conf.getInt(HLL_SHARD_BASE_PROPERTY_NAME, 1); + logger.info("shard base for hll is " + hllShardBase); + } + + public Configuration getConf() { + return conf; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/377ec491/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java index 08dadc9..dee384f 100755 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java @@ -117,7 +117,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { } setupMapper(segment); - setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? reducerCount + 2 : reducerCount); + setupReducer(output, segment, statistics_enabled, reducerCount); attachCubeMetadata(cube, job.getConfiguration()); @@ -136,6 +136,15 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { } + private int getHLLShardBase(CubeSegment segment) { + int nCuboids = segment.getCuboidScheduler().getAllCuboidIds().size(); + int shardBase = (nCuboids - 1) / segment.getConfig().getFactDistinctJobPerReducerHLLCuboidNumber() + 1; + if (shardBase > segment.getConfig().getFactDistinctJobHLLMaxReducerNumber()) { + shardBase = segment.getConfig().getFactDistinctJobHLLMaxReducerNumber(); + } + return shardBase; + } + private void setupMapper(CubeSegment cubeSeg) throws IOException { IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat(); flatTableInputFormat.configureJob(job); @@ -146,7 +155,14 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { job.setMapOutputValueClass(Text.class); } - private void setupReducer(Path output, int numberOfReducers) throws IOException { + private void setupReducer(Path output, CubeSegment cubeSeg, String statistics_enabled, int reducerCount) + throws IOException { + int numberOfReducers = reducerCount; + if ("true".equalsIgnoreCase(statistics_enabled)) { + int hllShardBase = getHLLShardBase(cubeSeg); + FactDistinctColumnPartitioner.setHLLShard(job.getConfiguration(), hllShardBase); + numberOfReducers += (1 + hllShardBase); + } job.setReducerClass(FactDistinctColumnsReducer.class); job.setPartitionerClass(FactDistinctColumnPartitioner.class); job.setNumReduceTasks(numberOfReducers); http://git-wip-us.apache.org/repos/asf/kylin/blob/377ec491/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index 0f65a3e..a733430 100755 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -107,24 +107,38 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK uhcReducerCount = cube.getConfig().getUHCReducerCount(); initReducerIdToColumnIndex(config); - if (collectStatistics && (taskId == numberOfTasks - 1)) { - // hll - isStatistics = true; - baseCuboidId = cube.getCuboidScheduler().getBaseCuboidId(); - baseCuboidRowCountInMappers = Lists.newArrayList(); - cuboidHLLMap = Maps.newHashMap(); - samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); - logger.info("Reducer " + taskId + " handling stats"); - } else if (collectStatistics && (taskId == numberOfTasks - 2)) { - // partition col - isPartitionCol = true; - col = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef(); - if (col == null) { - logger.info("No partition col. This reducer will do nothing"); + boolean ifCol = true; + if (collectStatistics) { + int hllShardBase = conf.getInt(FactDistinctColumnPartitioner.HLL_SHARD_BASE_PROPERTY_NAME, 0); + if (hllShardBase <= 0) { + throw new IllegalArgumentException("In job configuration the value for property " + + FactDistinctColumnPartitioner.HLL_SHARD_BASE_PROPERTY_NAME + " is " + hllShardBase + + ". It should be set correctly!!!"); + } + ifCol = false; + if (taskId >= numberOfTasks - hllShardBase) { + // hll + isStatistics = true; + baseCuboidId = cube.getCuboidScheduler().getBaseCuboidId(); + baseCuboidRowCountInMappers = Lists.newArrayList(); + cuboidHLLMap = Maps.newHashMap(); + samplingPercentage = Integer + .parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); + logger.info("Reducer " + taskId + " handling stats"); + } else if (taskId == numberOfTasks - hllShardBase - 1) { + // partition col + isPartitionCol = true; + col = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef(); + if (col == null) { + logger.info("No partition col. This reducer will do nothing"); + } else { + logger.info("Reducer " + taskId + " handling partition col " + col.getIdentity()); + } } else { - logger.info("Reducer " + taskId + " handling partition col " + col.getIdentity()); + ifCol = true; } - } else { + } + if (ifCol) { // normal col col = columnList.get(reducerIdToColumnIndex.get(taskId)); Preconditions.checkNotNull(col); @@ -291,7 +305,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK } private void logMapperAndCuboidStatistics(List<Long> allCuboids) throws IOException { - logger.info("Total cuboid number: \t" + allCuboids.size()); + logger.info("Cuboid number for task: " + taskId + "\t" + allCuboids.size()); logger.info("Samping percentage: \t" + samplingPercentage); logger.info("The following statistics are collected based on sampling data."); logger.info("Number of Mappers: " + baseCuboidRowCountInMappers.size()); @@ -308,11 +322,8 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK logger.info("Cuboid " + i + " row count is: \t " + cuboidHLLMap.get(i).getCountEstimate()); } - logger.info("Sum of all the cube segments (before merge) is: \t " + totalRowsBeforeMerge); - logger.info("After merge, the cube has row count: \t " + grantTotal); - if (grantTotal > 0) { - logger.info("The mapper overlap ratio is: \t" + totalRowsBeforeMerge / grantTotal); - } + logger.info("Sum of row counts (before merge) is: \t " + totalRowsBeforeMerge); + logger.info("After merge, the row count: \t " + grantTotal); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/377ec491/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java index 2196f09..f69bf67 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java @@ -19,25 +19,40 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.CubeStatsWriter; import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.measure.hllc.HLLCounter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + /** * Save the cube segment statistic to Kylin metadata store */ @@ -56,14 +71,79 @@ public class SaveStatisticsStep extends AbstractExecutable { ResourceStore rs = ResourceStore.getStore(kylinConf); try { + + FileSystem fs = HadoopUtil.getWorkingFileSystem(); + Configuration hadoopConf = HadoopUtil.getCurrentConfiguration(); Path statisticsDir = new Path(CubingExecutableUtil.getStatisticsPath(this.getParams())); - FileSystem fs = HadoopUtil.getFileSystem(statisticsDir); - Path statisticsFilePath = HadoopUtil.getFilterOnlyPath(fs, statisticsDir, BatchConstants.CFG_OUTPUT_STATISTICS); - if (statisticsFilePath == null) { + Path[] statisticsFiles = HadoopUtil.getFilterPath(fs, statisticsDir, BatchConstants.CFG_OUTPUT_STATISTICS); + if (statisticsFiles == null) { throw new IOException("fail to find the statistics file in base dir: " + statisticsDir); } - FSDataInputStream is = fs.open(statisticsFilePath); + Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap(); + long totalRowsBeforeMerge = 0; + long grantTotal = 0; + int samplingPercentage = -1; + int mapperNumber = -1; + for (Path item : statisticsFiles) { + int pSamplingPercentage = 0; + double pMapperOverlapRatio = 0; + int pMapperNumber = 0; + long pGrantTotal = 0; + try (SequenceFile.Reader reader = new SequenceFile.Reader(hadoopConf, SequenceFile.Reader.file(item))) { + LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), hadoopConf); + BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), + hadoopConf); + while (reader.next(key, value)) { + if (key.get() == 0L) { + pSamplingPercentage = Bytes.toInt(value.getBytes()); + } else if (key.get() == -1L) { + pMapperOverlapRatio = Bytes.toDouble(value.getBytes()); + } else if (key.get() == -2L) { + pMapperNumber = Bytes.toInt(value.getBytes()); + } else { + HLLCounter hll = new HLLCounter(kylinConf.getCubeStatsHLLPrecision()); + ByteArray byteArray = new ByteArray(value.getBytes()); + hll.readRegisters(byteArray.asBuffer()); + cuboidHLLMap.put(key.get(), hll); + pGrantTotal += hll.getCountEstimate(); + } + } + totalRowsBeforeMerge += pGrantTotal * pMapperOverlapRatio; + grantTotal += pGrantTotal; + if (pMapperNumber > 0) { + if (mapperNumber < 0) { + mapperNumber = pMapperNumber; + } else { + throw new RuntimeException( + "Base cuboid has been distributed to multiple reducers at step FactDistinctColumnsReducer!!!"); + } + } + if (samplingPercentage < 0) { + samplingPercentage = pSamplingPercentage; + } else if (samplingPercentage != pSamplingPercentage) { + throw new RuntimeException( + "The sampling percentage should be same among all of the reducer of FactDistinctColumnsReducer!!!"); + } + } + } + if (samplingPercentage < 0) { + logger.warn("The sampling percentage should be set!!!"); + } + if (mapperNumber < 0) { + logger.warn("The mapper number should be set!!!"); + } + + if (logger.isDebugEnabled()) { + logMapperAndCuboidStatistics(cuboidHLLMap, samplingPercentage, mapperNumber, grantTotal, + totalRowsBeforeMerge); + } + double mapperOverlapRatio = grantTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grantTotal; + CubeStatsWriter.writeCuboidStatistics(hadoopConf, statisticsDir, cuboidHLLMap, samplingPercentage, + mapperNumber, mapperOverlapRatio); + + Path statisticsFile = new Path(statisticsDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); + FSDataInputStream is = fs.open(statisticsFile); try { // put the statistics to metadata store String statisticsFileName = newSegment.getStatisticsResourcePath(); @@ -84,4 +164,23 @@ public class SaveStatisticsStep extends AbstractExecutable { } } + private void logMapperAndCuboidStatistics(Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, + int mapperNumber, long grantTotal, long totalRowsBeforeMerge) throws IOException { + logger.debug("Total cuboid number: \t" + cuboidHLLMap.size()); + logger.debug("Samping percentage: \t" + samplingPercentage); + logger.debug("The following statistics are collected based on sampling data."); + logger.debug("Number of Mappers: " + mapperNumber); + + List<Long> allCuboids = Lists.newArrayList(cuboidHLLMap.keySet()); + Collections.sort(allCuboids); + for (long i : allCuboids) { + logger.debug("Cuboid " + i + " row count is: \t " + cuboidHLLMap.get(i).getCountEstimate()); + } + + logger.debug("Sum of all the cube segments (before merge) is: \t " + totalRowsBeforeMerge); + logger.debug("After merge, the cube has row count: \t " + grantTotal); + if (grantTotal > 0) { + logger.debug("The mapper overlap ratio is: \t" + (double) totalRowsBeforeMerge / grantTotal); + } + } }
