KYLIN-2135 Enlarge FactDistinctColumns reducer number Signed-off-by: shaofengshi <shaofeng...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/74214030 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/74214030 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/74214030 Branch: refs/heads/KYLIN-2006 Commit: 74214030272ffef275ccf0359b583b3278aec468 Parents: 47de961 Author: kangkaisen <kangkai...@live.com> Authored: Wed Oct 26 19:35:20 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Tue Nov 8 21:29:22 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 5 ++ .../java/org/apache/kylin/cube/CubeManager.java | 35 ++++++++++++ .../kylin/engine/mr/DFSFileTableReader.java | 59 ++++++++++++++++---- .../kylin/engine/mr/common/BatchConstants.java | 5 ++ .../mr/steps/FactDistinctColumnPartitioner.java | 11 +--- .../engine/mr/steps/FactDistinctColumnsJob.java | 18 +++++- .../mr/steps/FactDistinctColumnsMapperBase.java | 17 +++++- .../mr/steps/FactDistinctColumnsReducer.java | 34 ++++++++++- .../mr/steps/FactDistinctHiveColumnsMapper.java | 12 +++- 9 files changed, 170 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index d9d10bb..6d3e807 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -374,6 +374,11 @@ abstract public class KylinConfigBase implements Serializable { return Boolean.parseBoolean(getOptional("kylin.job.allow.empty.segment", "true")); } + //UHC: ultra high cardinality columns, contain the ShardByColumns and the GlobalDictionaryColumns + public int getUHCReducerCount() { + return Integer.parseInt(getOptional("kylin.job.uhc.reducer.count", "3")); + } + public String getOverrideHiveTableLocation(String table) { return getOptional("hive.table.location." + table.toUpperCase()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 87bb93d..9893040 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -31,6 +31,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -43,6 +44,7 @@ import org.apache.kylin.common.persistence.Serializer; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.DictionaryDesc; import org.apache.kylin.cube.model.DimensionDesc; import org.apache.kylin.dict.DictionaryInfo; import org.apache.kylin.dict.DictionaryManager; @@ -1049,4 +1051,37 @@ public class CubeManager implements IRealizationProvider { } return holes; } + + private final String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder"; + + //UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns + public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException { + List<TblColRef> factDictCols = getAllDictColumnsOnFact(cubeDesc); + int[] uhcIndex = new int[factDictCols.size()]; + + //add GlobalDictionaryColumns + List<DictionaryDesc> dictionaryDescList = cubeDesc.getDictionaries(); + if (dictionaryDescList != null) { + for (DictionaryDesc dictionaryDesc : dictionaryDescList) { + if (dictionaryDesc.getBuilderClass() != null && dictionaryDesc.getBuilderClass().equalsIgnoreCase(GLOBAL_DICTIONNARY_CLASS)) { + for (int i = 0; i < factDictCols.size(); i++) { + if (factDictCols.get(i).equals(dictionaryDesc.getColumnRef())) { + uhcIndex[i] = 1; + break; + } + } + } + } + } + + //add ShardByColumns + Set<TblColRef> shardByColumns = cubeDesc.getShardByColumns(); + for (int i = 0; i < factDictCols.size(); i++) { + if (shardByColumns.contains(factDictCols.get(i))) { + uhcIndex[i] = 1; + } + } + + return uhcIndex; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java index 300b123..dda1d6f 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java @@ -23,10 +23,14 @@ import java.io.Closeable; import java.io.EOFException; import java.io.IOException; import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import org.apache.commons.lang.StringEscapeUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; @@ -53,7 +57,7 @@ public class DFSFileTableReader implements TableReader { private String filePath; private String delim; - private RowReader reader; + private List<RowReader> readerList; private String curLine; private String[] curColumns; @@ -68,17 +72,33 @@ public class DFSFileTableReader implements TableReader { this.filePath = filePath; this.delim = delim; this.expectedColumnNumber = expectedColumnNumber; + this.readerList = new ArrayList<RowReader>(); FileSystem fs = HadoopUtil.getFileSystem(filePath); - try { - this.reader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, filePath); - + ArrayList<FileStatus> allFiles = new ArrayList<>(); + FileStatus status = fs.getFileStatus(new Path(filePath)); + if (status.isFile()) { + allFiles.add(status); + } else { + FileStatus[] listStatus = fs.listStatus(new Path(filePath)); + allFiles.addAll(Arrays.asList(listStatus)); + } + + try { + for (FileStatus f : allFiles) { + RowReader rowReader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, f.getPath().toString()); + this.readerList.add(rowReader); + } } catch (IOException e) { if (isExceptionSayingNotSeqFile(e) == false) throw e; - this.reader = new CsvRowReader(fs, filePath); + this.readerList = new ArrayList<RowReader>(); + for (FileStatus f : allFiles) { + RowReader rowReader = new CsvRowReader(fs, f.getPath().toString()); + this.readerList.add(rowReader); + } } } @@ -94,9 +114,20 @@ public class DFSFileTableReader implements TableReader { @Override public boolean next() throws IOException { - curLine = reader.nextLine(); - curColumns = null; - return curLine != null; + int curReaderIndex = -1; + RowReader curReader; + + while (++curReaderIndex < readerList.size()) { + curReader = readerList.get(curReaderIndex); + curLine = curReader.nextLine(); + curColumns = null; + + if (curLine != null) { + return true; + } + } + + return false; } public String getLine() { @@ -145,9 +176,15 @@ public class DFSFileTableReader implements TableReader { } @Override - public void close() throws IOException { - if (reader != null) - reader.close(); + public void close() { + for (RowReader reader : readerList) { + try { + if (reader != null) + reader.close(); + } catch (IOException e) { + logger.warn("close file failed:", e); + } + } } private String autoDetectDelim(String line) { http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java index e4a8808..078d80f 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@ -81,4 +81,9 @@ public interface BatchConstants { String MAPREDUCE_COUNTER_GROUP_NAME = "Cube Builder"; int NORMAL_RECORD_LOG_THRESHOLD = 100000; int ERROR_RECORD_LOG_THRESHOLD = 100; + + /** + * dictionaries builder class + */ + String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder"; } http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java index 6973c4b..b36e422 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java @@ -18,7 +18,6 @@ package org.apache.kylin.engine.mr.steps; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.kylin.common.util.BytesUtil; @@ -26,22 +25,16 @@ import org.apache.kylin.common.util.BytesUtil; /** */ public class FactDistinctColumnPartitioner extends Partitioner<Text, Text> { - private Configuration conf; - @Override public int getPartition(Text key, Text value, int numReduceTasks) { - if (key.getBytes()[0] == FactDistinctHiveColumnsMapper.MARK_FOR_HLL) { // the last reducer is for merging hll return numReduceTasks - 1; } else if (key.getBytes()[0] == FactDistinctHiveColumnsMapper.MARK_FOR_PARTITION_COL) { - // the last reducer is for merging hll + // the last but one reducer is for partition col return numReduceTasks - 2; } else { - int colIndex = BytesUtil.readUnsigned(key.getBytes(), 0, 1); - return colIndex; + return BytesUtil.readUnsigned(key.getBytes(), 0, 1); } - } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java index 28ee335..92da7d1 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java @@ -78,11 +78,27 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { CubeInstance cube = cubeMgr.getCube(cubeName); List<TblColRef> columnsNeedDict = cubeMgr.getAllDictColumnsOnFact(cube.getDescriptor()); + int reducerCount = columnsNeedDict.size(); + int uhcReducerCount = cube.getConfig().getUHCReducerCount(); + + int[] uhcIndex = cubeMgr.getUHCIndex(cube.getDescriptor()); + for(int index : uhcIndex) { + if(index == 1) { + reducerCount += uhcReducerCount - 1; + } + } + + if (reducerCount > 255) { + throw new IOException("The max reducer number for FactDistinctColumnsJob is 255, please decrease the 'kylin.job.global.dictionary.column.reducer.count' "); + } + + job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); job.getConfiguration().set(BatchConstants.CFG_STATISTICS_ENABLED, statistics_enabled); job.getConfiguration().set(BatchConstants.CFG_STATISTICS_OUTPUT, statistics_output); job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, statistics_sampling_percent); + logger.info("Starting: " + job.getJobName()); setJobClasspath(job, cube.getConfig()); @@ -98,7 +114,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { logger.info("Found segment: " + segment); } setupMapper(cube.getSegmentById(segmentID)); - setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? columnsNeedDict.size() + 2 : columnsNeedDict.size()); + setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? reducerCount + 2 : reducerCount); attachKylinPropsAndMetadata(cube, job.getConfiguration()); http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java index 3fa966d..196bf1e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java @@ -20,7 +20,9 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; @@ -58,6 +60,10 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K protected CubeJoinedFlatTableEnrich intermediateTableDesc; protected int[] dictionaryColumnIndex; + protected int uhcReducerCount; + protected int[] uhcIndex; + protected Map<Integer, Integer> columnIndexToReducerBeginId = new HashMap<>(); + @Override protected void setup(Context context) throws IOException { Configuration conf = context.getConfiguration(); @@ -73,7 +79,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat(); - intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc); + intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc); dictionaryColumnIndex = new int[factDictCols.size()]; for (int i = 0; i < factDictCols.size(); i++) { TblColRef colRef = factDictCols.get(i); @@ -81,6 +87,15 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K dictionaryColumnIndex[i] = columnIndexOnFlatTbl; } + uhcIndex = CubeManager.getInstance(config).getUHCIndex(cubeDesc); + uhcReducerCount = cube.getConfig().getUHCReducerCount(); + int count = 0; + for (int i = 0; i < uhcIndex.length; i++) { + columnIndexToReducerBeginId.put(i, count * (uhcReducerCount - 1) + i); + if (uhcIndex[i] == 1) { + count++; + } + } } protected void handleErrorRecord(String[] record, Exception ex) throws IOException { http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index ecbc6c2..5b00381 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -67,6 +68,10 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri private boolean isStatistics = false; private boolean isPartitionCol = false; private KylinConfig cubeConfig; + private int uhcReducerCount; + private Map<Integer, Integer> ReducerIdToColumnIndex = new HashMap<>(); + private int taskId; + protected static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsReducer.class); @Override @@ -83,7 +88,10 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri boolean collectStatistics = Boolean.parseBoolean(conf.get(BatchConstants.CFG_STATISTICS_ENABLED)); int numberOfTasks = context.getNumReduceTasks(); - int taskId = context.getTaskAttemptID().getTaskID().getId(); + taskId = context.getTaskAttemptID().getTaskID().getId(); + + uhcReducerCount = cube.getConfig().getUHCReducerCount(); + initReducerIdToColumnIndex(config); if (collectStatistics && (taskId == numberOfTasks - 1)) { // hll @@ -102,11 +110,25 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri // col isStatistics = false; isPartitionCol = false; - col = columnList.get(taskId); + col = columnList.get(ReducerIdToColumnIndex.get(taskId)); colValues = Lists.newLinkedList(); } } + private void initReducerIdToColumnIndex(KylinConfig config) throws IOException { + int[] uhcIndex = CubeManager.getInstance(config).getUHCIndex(cubeDesc); + int count = 0; + for (int i = 0; i < uhcIndex.length; i++) { + ReducerIdToColumnIndex.put(count * (uhcReducerCount - 1) + i, i); + if (uhcIndex[i] == 1) { + for (int j = 1; j < uhcReducerCount; j++) { + ReducerIdToColumnIndex.put(count * (uhcReducerCount - 1) + j + i, i); + } + count++; + } + } + } + @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { @@ -153,10 +175,16 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri final Configuration conf = context.getConfiguration(); final FileSystem fs = FileSystem.get(conf); final String outputPath = conf.get(BatchConstants.CFG_OUTPUT_PATH); - final Path outputFile = new Path(outputPath, col.getName()); + final Path colDir = new Path(outputPath, col.getName()); + final String fileName = col.getName() + "-" + taskId % uhcReducerCount; + final Path outputFile = new Path(colDir, fileName); FSDataOutputStream out = null; try { + if (!fs.exists(colDir)) { + fs.mkdirs(colDir); + } + if (fs.exists(outputFile)) { out = fs.append(outputFile); logger.info("append file " + outputFile); http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java index 177c9f6..7a183b8 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java @@ -141,7 +141,17 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap if (fieldValue == null) continue; int offset = keyBuffer.position(); - keyBuffer.put(Bytes.toBytes(i)[3]); // one byte is enough + + int reducerIndex; + if (uhcIndex[i] == 0) { + //for the normal dictionary column + reducerIndex = columnIndexToReducerBeginId.get(i); + } else { + //for the uhc + reducerIndex = columnIndexToReducerBeginId.get(i) + (fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount; + } + + keyBuffer.put(Bytes.toBytes(reducerIndex)[3]); keyBuffer.put(Bytes.toBytes(fieldValue)); outputKey.set(keyBuffer.array(), offset, keyBuffer.position() - offset); context.write(outputKey, EMPTY_TEXT);