Repository: kylin Updated Branches: refs/heads/KYLIN-2217-2 cf8eb696a -> 44eab9187
cv123 Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/44eab918 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/44eab918 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/44eab918 Branch: refs/heads/KYLIN-2217-2 Commit: 44eab91870ea2b4b888d8fa3a858080d36406f90 Parents: cf8eb69 Author: Li Yang <liy...@apache.org> Authored: Wed Nov 30 16:58:27 2016 +0800 Committer: Li Yang <liy...@apache.org> Committed: Wed Nov 30 16:58:27 2016 +0800 ---------------------------------------------------------------------- .../kylin/dict/DictionaryProviderTest.java | 9 +- .../mr/steps/FactDistinctColumnsReducer.java | 98 +++++++------------- .../mr/steps/FactDistinctHiveColumnsMapper.java | 9 +- 3 files changed, 38 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/44eab918/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java index 84b1080..a4aee76 100644 --- a/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java +++ b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java @@ -8,7 +8,6 @@ import java.io.DataOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; -import java.io.IOException; import java.util.Arrays; import java.util.Iterator; @@ -83,15 +82,9 @@ public class DictionaryProviderTest { String dictClassName2 = in.readUTF(); dict2 = (Dictionary<String>) ClassUtil.newInstance(dictClassName2); dict2.readFields(in); - } catch (IOException e) { - e.printStackTrace(); } finally { if (in != null) { - try { - in.close(); - } catch (IOException e) { - e.printStackTrace(); - } + in.close(); } } assertTrue(dict.equals(dict2)); http://git-wip-us.apache.org/repos/asf/kylin/blob/44eab918/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index af57a07..8933ee2 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -27,8 +27,6 @@ import java.util.List; import java.util.Map; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.time.FastDateFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -50,7 +48,6 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.CubeStatsWriter; import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; -import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,11 +80,10 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK //local build dict private boolean isReducerLocalBuildDict; private IDictionaryBuilder builder; - private FastDateFormat dateFormat; private long timeMaxValue = Long.MIN_VALUE; private long timeMinValue = Long.MAX_VALUE; - public static final String DICT_FILE_POSTFIX = ".RLD"; - public static final String PARTITION_COL_INFO_FILE_POSTFIX = ".PCI"; + public static final String DICT_FILE_POSTFIX = ".rldict"; + public static final String PARTITION_COL_INFO_FILE_POSTFIX = ".pci"; private boolean isPartitionCol = false; @Override @@ -121,34 +117,19 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK isPartitionCol = true; col = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef(); colValues = Lists.newLinkedList(); - DataType partitionColType = col.getType(); - if (partitionColType.isDate()) { - dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN); - } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) { - dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS); - } else if (partitionColType.isStringFamily()) { - String partitionDateFormat = cubeDesc.getModel().getPartitionDesc().getPartitionDateFormat(); - if (StringUtils.isEmpty(partitionDateFormat)) { - partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN; - } - dateFormat = DateFormat.getDateFormat(partitionDateFormat); - } else { - throw new IllegalStateException("Type " + partitionColType + " is not valid partition column type"); - } } else { - // col + // normal col isStatistics = false; col = columnList.get(reducerIdToColumnIndex.get(taskId)); colValues = Lists.newLinkedList(); + + // local build dict + isReducerLocalBuildDict = config.isReducerLocalBuildDict(); + if (col != null && isReducerLocalBuildDict) { + builder = DictionaryGenerator.newDictionaryBuilder(col.getType()); + builder.init(null, 0); + } } - - //local build dict - isReducerLocalBuildDict = config.isReducerLocalBuildDict(); - if (col != null && isReducerLocalBuildDict) { - builder = DictionaryGenerator.newDictionaryBuilder(col.getType()); - builder.init(null, 0); - } - } private void initReducerIdToColumnIndex(KylinConfig config) throws IOException { @@ -168,7 +149,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK @Override public void doReduce(SelfDefineSortableKey skey, Iterable<Text> values, Context context) throws IOException, InterruptedException { Text key = skey.getText(); - if (isStatistics == true) { + if (isStatistics) { // for hll long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG); for (Text value : values) { @@ -188,20 +169,17 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK cuboidHLLMap.put(cuboidId, hll); } } + } else if (isPartitionCol) { + // partition col + String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1); + long time = DateFormat.stringToMillis(value); + timeMinValue = Math.min(timeMinValue, time); + timeMaxValue = Math.max(timeMaxValue, time); } else { + // normal col if (isReducerLocalBuildDict) { - String value = new String(key.getBytes(), 1, key.getLength() - 1); - //partition col - try { - if (isPartitionCol) { - long time = dateFormat.parse(value).getTime(); - timeMinValue = Math.min(timeMinValue, time); - timeMaxValue = Math.max(timeMaxValue, time); - } - builder.addValue(value); - } catch (Exception e) { - e.printStackTrace(); - } + String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1); + builder.addValue(value); } else { colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1))); if (colValues.size() == 1000000) { //spill every 1 million @@ -211,7 +189,6 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK } } } - } private void outputDistinctValues(TblColRef col, Collection<ByteArray> values, Context context) throws IOException { @@ -287,25 +264,8 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK @Override protected void doCleanup(Context context) throws IOException, InterruptedException { - if (isStatistics == false) { - if (isReducerLocalBuildDict) { - try { - if (isPartitionCol) { - outputPartitionInfo(context); - } - Dictionary<String> dict = builder.build(); - outputDict(col, dict, context); - } catch (Exception e) { - e.printStackTrace(); - } - } else { - if (colValues.size() > 0) { - outputDistinctValues(col, colValues, context); - colValues.clear(); - } - } - } else { - //output the hll info; + if (isStatistics) { + // output the hll info long grandTotal = 0; for (HyperLogLogPlusCounter hll : cuboidHLLMap.values()) { grandTotal += hll.getCountEstimate(); @@ -317,6 +277,20 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK writeMapperAndCuboidStatistics(context); // for human check CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), // cuboidHLLMap, samplingPercentage, mapperNumber, mapperOverlapRatio); + } else if (isPartitionCol) { + // partition col + outputPartitionInfo(context); + } else { + // normal col + if (isReducerLocalBuildDict) { + Dictionary<String> dict = builder.build(); + outputDict(col, dict, context); + } else { + if (colValues.size() > 0) { + outputDistinctValues(col, colValues, context); + colValues.clear(); + } + } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/44eab918/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java index 762047b..a5c8fc0 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java @@ -97,15 +97,8 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap // if partition col not on cube, no need needFetchPartitionCol = false; } else { - for (int x : dictionaryColumnIndex) { - if (x == partitionColumnIndex) { - // if partition col already build dict, no need - needFetchPartitionCol = false; - break; - } - } + needFetchPartitionCol = true; } - } }