This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit d1ed1072728ca0bd12b83a04e2308784b2e045a6 Author: ZhansShaoxiong <shaoxiong0...@gmail.com> AuthorDate: Tue Jun 26 02:07:42 2018 +0800 KYLIN-3423 Performance improvement in FactDistinctColumnsMapper Signed-off-by: shaofengshi <shaofeng...@apache.org> --- .../org/apache/kylin/cube/DimensionRangeInfo.java | 8 ++ .../engine/mr/steps/FactDistinctColumnsMapper.java | 127 +++++++++++++++++---- .../mr/steps/FactDistinctColumnsMapperBase.java | 11 +- .../FactDistinctColumnsReducerMappingTest.java | 4 +- .../kylin/engine/mr/steps/DictColDeduperTest.java | 65 +++++++++++ 5 files changed, 185 insertions(+), 30 deletions(-) diff --git a/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java b/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java index e36ca96..0b0d1c4 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java @@ -92,9 +92,17 @@ public class DimensionRangeInfo { return min; } + public void setMin(String min) { + this.min = min; + } + public String getMax() { return max; } + + public void setMax(String max) { + this.max = max; + } @Override public String toString() { diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java index fc9dc65..7bffce7 100755 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -29,7 +31,9 @@ import java.util.concurrent.LinkedBlockingQueue; import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinVersion; import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.MemoryBudgetController; import org.apache.kylin.common.util.StringUtil; +import org.apache.kylin.cube.DimensionRangeInfo; import org.apache.kylin.cube.cuboid.CuboidUtil; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil; @@ -37,9 +41,11 @@ import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.measure.hllc.HLLCounter; import org.apache.kylin.measure.hllc.RegisterType; import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.hash.HashFunction; import com.google.common.hash.Hasher; @@ -62,6 +68,9 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB private int rowCount = 0; private int samplingPercentage; private ByteBuffer tmpbuf; + + private DictColDeduper dictColDeduper; + private Map<Integer, DimensionRangeInfo> dimensionRangeInfoMap = Maps.newHashMap(); private CuboidStatCalculator[] cuboidStatCalculators; @@ -132,6 +141,14 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB cuboidStatCalculators[i] = calculator; calculator.start(); } + + // setup dict col deduper + dictColDeduper = new DictColDeduper(); + Set<TblColRef> dictCols = cubeDesc.getAllColumnsNeedDictionaryBuilt(); + for (int i = 0; i < allCols.size(); i++) { + if (dictCols.contains(allCols.get(i))) + dictColDeduper.setIsDictCol(i); + } } private int getStatsThreadNum(int cuboidNum) { @@ -156,41 +173,42 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB for (String[] row : rowCollection) { context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row)); - for (int i = 0; i < allDimDictCols.size(); i++) { + for (int i = 0; i < allCols.size(); i++) { String fieldValue = row[columnIndex[i]]; if (fieldValue == null) continue; - int reducerIndex = reducerMapping.getReducerIdForCol(i, fieldValue); - - tmpbuf.clear(); - byte[] valueBytes = Bytes.toBytes(fieldValue); - int size = valueBytes.length + 1; - if (size >= tmpbuf.capacity()) { - tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size)); - } - tmpbuf.put(Bytes.toBytes(reducerIndex)[3]); - tmpbuf.put(valueBytes); - outputKey.set(tmpbuf.array(), 0, tmpbuf.position()); - DataType type = allDimDictCols.get(i).getType(); - sortableKey.init(outputKey, type); - context.write(sortableKey, EMPTY_TEXT); - - // log a few rows for troubleshooting - if (rowCount < 10) { - logger.info( - "Sample output: " + allDimDictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex); + final DataType type = allCols.get(i).getType(); + + //for dic column, de dup before write value; for dim not dic column, hold util doCleanup() + if (dictColDeduper.isDictCol(i)) { + if (dictColDeduper.add(i, fieldValue)) { + writeFieldValue(context, type, i, fieldValue); + } + } else { + DimensionRangeInfo old = dimensionRangeInfoMap.get(i); + if (old == null) { + old = new DimensionRangeInfo(fieldValue, fieldValue); + dimensionRangeInfoMap.put(i, old); + } else { + old.setMax(type.getOrder().max(old.getMax(), fieldValue)); + old.setMin(type.getOrder().min(old.getMin(), fieldValue)); + } } } if (rowCount % 100 < samplingPercentage) { putRowKeyToHLL(row); } + + if (rowCount % 100 == 0) { + dictColDeduper.resetIfShortOfMem(); + } rowCount++; } } - + private void putRowKeyToHLL(String[] row) { for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) { cuboidStatCalculator.putRow(row); @@ -231,6 +249,12 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB context.write(sortableKey, outputValue); } } + for (Integer colIndex : dimensionRangeInfoMap.keySet()) { + DimensionRangeInfo rangeInfo = dimensionRangeInfoMap.get(colIndex); + DataType dataType = allCols.get(colIndex).getType(); + writeFieldValue(context, dataType, colIndex, rangeInfo.getMin()); + writeFieldValue(context, dataType, colIndex, rangeInfo.getMax()); + } } private int countNewSize(int oldSize, int dataSize) { @@ -241,6 +265,26 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB return newSize; } + private void writeFieldValue(Context context, DataType type, Integer colIndex, String value) + throws IOException, InterruptedException { + int reducerIndex = reducerMapping.getReducerIdForCol(colIndex, value); + tmpbuf.clear(); + byte[] valueBytes = Bytes.toBytes(value); + int size = valueBytes.length + 1; + if (size >= tmpbuf.capacity()) { + tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size)); + } + tmpbuf.put(Bytes.toBytes(reducerIndex)[3]); + tmpbuf.put(valueBytes); + outputKey.set(tmpbuf.array(), 0, tmpbuf.position()); + sortableKey.init(outputKey, type); + context.write(sortableKey, EMPTY_TEXT); + // log a few rows for troubleshooting + if (rowCount < 10) { + logger.info("Sample output: " + allCols.get(colIndex) + " '" + value + "' => reducer " + reducerIndex); + } + } + public static class CuboidStatCalculator implements Runnable { private final int id; private final int nRowKey; @@ -371,4 +415,45 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB } } } + + public static class DictColDeduper { + + final boolean enabled; + final int resetThresholdMB; + final Map<Integer, Set<String>> colValueSets = Maps.newHashMap(); + + public DictColDeduper() { + this(200, 100); + } + + public DictColDeduper(int enableThresholdMB, int resetThresholdMB) { + // only enable when there is sufficient memory + this.enabled = MemoryBudgetController.getSystemAvailMB() >= enableThresholdMB; + this.resetThresholdMB = resetThresholdMB; + } + + public void setIsDictCol(int i) { + colValueSets.put(i, new HashSet<String>()); + } + + public boolean isDictCol(int i) { + return colValueSets.containsKey(i); + } + + public boolean add(int i, String fieldValue) { + return colValueSets.get(i).add(fieldValue); + } + + public Set<String> getValueSet(int i) { + return colValueSets.get(i); + } + + public void resetIfShortOfMem() { + if (MemoryBudgetController.getSystemAvailMB() < resetThresholdMB) { + for (Set<String> set : colValueSets.values()) + set.clear(); + } + } + + } } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java index ceddeb5..ad9030c 100755 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java @@ -49,10 +49,9 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends Kyli protected CubeDesc cubeDesc; protected long baseCuboidId; protected IMRTableInputFormat flatTableInputFormat; - protected List<TblColRef> allDimDictCols; + protected List<TblColRef> allCols; protected Text outputKey = new Text(); - //protected SelfDefineSortableKey sortableKey = new SelfDefineSortableKey(); protected Text outputValue = new Text(); protected int errorRecordCounter = 0; @@ -73,14 +72,14 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends Kyli cubeDesc = cube.getDescriptor(); baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); reducerMapping = new FactDistinctColumnsReducerMapping(cube); - allDimDictCols = reducerMapping.getAllDimDictCols(); + allCols = reducerMapping.getAllDimDictCols(); flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat(); intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc); - columnIndex = new int[allDimDictCols.size()]; - for (int i = 0; i < allDimDictCols.size(); i++) { - TblColRef colRef = allDimDictCols.get(i); + columnIndex = new int[allCols.size()]; + for (int i = 0; i < allCols.size(); i++) { + TblColRef colRef = allCols.get(i); int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef); columnIndex[i] = columnIndexOnFlatTbl; } diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java index 3c58b26..9a00d55 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java @@ -54,9 +54,7 @@ public class FactDistinctColumnsReducerMappingTest extends LocalFileMetadataTest TblColRef aUHC = cube.getModel().findColumn("TEST_COUNT_DISTINCT_BITMAP"); FactDistinctColumnsReducerMapping mapping = new FactDistinctColumnsReducerMapping(cube); - //System.out.println(mapping.getAllDictCols()); - //System.out.println(Arrays.toString(mapping.getAllRolePlaysForReducers())); - + int totalReducerNum = mapping.getTotalReducerNum(); Assert.assertEquals(2, mapping.getCuboidRowCounterReducerNum()); diff --git a/kylin/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/DictColDeduperTest.java b/kylin/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/DictColDeduperTest.java new file mode 100644 index 0000000..1634843 --- /dev/null +++ b/kylin/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/DictColDeduperTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.steps; + +import org.apache.kylin.engine.mr.steps.FactDistinctColumnsMapper.DictColDeduper; +import org.junit.Assert; +import org.junit.Test; + +/** + */ +public class DictColDeduperTest { + + @Test + public void testBasics() { + DictColDeduper dd = new DictColDeduper(50, 0); + + dd.setIsDictCol(0); + dd.setIsDictCol(2); + + Assert.assertTrue(dd.isDictCol(0)); + Assert.assertTrue(!dd.isDictCol(1)); + Assert.assertTrue(dd.isDictCol(2)); + Assert.assertTrue(!dd.isDictCol(3)); + + Assert.assertTrue(dd.add(0, "abc")); + dd.resetIfShortOfMem(); + Assert.assertTrue(!dd.add(0, "abc")); + + Assert.assertTrue(dd.add(2, "abc")); + dd.resetIfShortOfMem(); + Assert.assertTrue(!dd.add(2, "abc")); + } + + @Test + public void testReset() { + DictColDeduper dd = new DictColDeduper(50, Integer.MAX_VALUE); + + dd.setIsDictCol(0); + dd.setIsDictCol(2); + + Assert.assertTrue(dd.add(0, "abc")); + Assert.assertTrue(dd.add(2, "abc")); + + dd.resetIfShortOfMem(); + + Assert.assertTrue(dd.add(0, "abc")); + Assert.assertTrue(dd.add(2, "abc")); + } +}