http://git-wip-us.apache.org/repos/asf/kylin/blob/e5ff988c/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java index fbb258f..42946eb 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java @@ -166,7 +166,7 @@ public class HBaseKeyRange implements Comparable<HBaseKeyRange> { List<Map<TblColRef, String>> fuzzyValues = FuzzyValueCombination.calculate(fuzzyValueSet, FUZZY_VALUE_CAP); for (Map<TblColRef, String> fuzzyValue : fuzzyValues) { - result.add(new Pair<byte[], byte[]>(fuzzyKeyEncoder.encode(fuzzyValue), fuzzyMaskEncoder.encode(fuzzyValue))); + result.add(Pair.newPair(fuzzyKeyEncoder.encode(fuzzyValue), fuzzyMaskEncoder.encode(fuzzyValue))); } return result; }
http://git-wip-us.apache.org/repos/asf/kylin/blob/e5ff988c/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java index 152b4af..f2b34fc 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java @@ -94,6 +94,6 @@ public class DFSFileTable implements ReadableTable { lastModified = Math.max(lastModified, file.getModificationTime()); } - return new Pair<Long, Long>(size, lastModified); + return Pair.newPair(size, lastModified); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/e5ff988c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java index 45cc88e..3dddece 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java @@ -157,8 +157,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL int colParamIdx = 0; // index among parameters of column type for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) { String value; - if (function.isCount() || function.isHolisticCountDistinct()) { - // note for holistic count distinct, this value will be ignored + if (function.isCount()) { value = "1"; } else if (param.isColumnType()) { value = getCell(colIdxOnFlatTable[colParamIdx++], splitBuffers); http://git-wip-us.apache.org/repos/asf/kylin/blob/e5ff988c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java index 3edaefb..daa610f 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java @@ -170,9 +170,8 @@ public class CuboidJob extends AbstractHadoopJob { // number of reduce tasks int numReduceTasks = (int) Math.round(totalReduceInputMB / perReduceInputMB * reduceCountRatio); - // adjust reducer number for cube which has DISTINCT_COUNT measures for - // better performance - if (cubeDesc.hasHolisticCountDistinctMeasures()) { + // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance + if (cubeDesc.hasMemoryHungryMeasures()) { numReduceTasks = numReduceTasks * 4; } http://git-wip-us.apache.org/repos/asf/kylin/blob/e5ff988c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java index 070eb97..e7de9ff 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java @@ -124,7 +124,7 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By MeasureDesc measureDesc = measureDescs.get(i); MeasureType measureType = measureDesc.getFunction().getMeasureType(); if (measureType.getColumnsNeedDictionary(measureDesc.getFunction()).isEmpty() == false) { - dictMeasures.add(new Pair<Integer, MeasureIngester>(i, measureType.newIngester())); + dictMeasures.add(Pair.newPair(i, measureType.newIngester())); } } if (dictMeasures.size() > 0) { http://git-wip-us.apache.org/repos/asf/kylin/blob/e5ff988c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java index b3f7f54..0bbf012 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java @@ -126,7 +126,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> { MeasureDesc measureDesc = measureDescs.get(i); MeasureType measureType = measureDesc.getFunction().getMeasureType(); if (measureType.getColumnsNeedDictionary(measureDesc.getFunction()).isEmpty() == false) { - dictMeasures.add(new Pair<Integer, MeasureIngester>(i, measureType.newIngester())); + dictMeasures.add(Pair.newPair(i, measureType.newIngester())); } } if (dictMeasures.size() > 0) { http://git-wip-us.apache.org/repos/asf/kylin/blob/e5ff988c/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java index 99b85e9..fe29d2a 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java @@ -63,6 +63,7 @@ import org.apache.kylin.common.util.AbstractApplication; import org.apache.kylin.common.util.OptionsHelper; import org.apache.kylin.measure.MeasureAggregators; import org.apache.kylin.measure.MeasureCodec; +import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; @@ -325,11 +326,11 @@ public class SparkCubing extends AbstractApplication { } for (MeasureDesc measureDesc : cubeDesc.getMeasures()) { - if (measureDesc.getFunction().isTopN()) { - List<TblColRef> colRefs = measureDesc.getFunction().getParameter().getColRefs(); - TblColRef col = colRefs.get(colRefs.size() - 1); - dictionaryMap.put(col, cubeSegment.getDictionary(col)); - } + FunctionDesc func = measureDesc.getFunction(); + List<TblColRef> colRefs = func.getMeasureType().getColumnsNeedDictionary(func); + for (TblColRef col : colRefs) { + dictionaryMap.put(col, cubeSegment.getDictionary(col)); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/e5ff988c/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedLenMeasureCodec.java ---------------------------------------------------------------------- diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedLenMeasureCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedLenMeasureCodec.java index c9a78a4..bd952a1 100644 --- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedLenMeasureCodec.java +++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedLenMeasureCodec.java @@ -20,12 +20,13 @@ package org.apache.kylin.invertedindex.measure; import java.nio.ByteBuffer; +import org.apache.kylin.measure.hllc.HLLCMeasureType; import org.apache.kylin.metadata.datatype.DataType; abstract public class FixedLenMeasureCodec<T> { public static FixedLenMeasureCodec<?> get(DataType type) { - if (type.isHLLC()) { + if (HLLCMeasureType.DATATYPE_HLLC.equals(type.getName())) { return new FixedHLLCodec(type); } else { return new FixedPointLongCodec(type); http://git-wip-us.apache.org/repos/asf/kylin/blob/e5ff988c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTopNTupleIterator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTopNTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTopNTupleIterator.java deleted file mode 100644 index 16748a6..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTopNTupleIterator.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.cube.v1; - -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.Result; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.metadata.filter.TupleFilter; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.tuple.Tuple; -import org.apache.kylin.metadata.tuple.TupleInfo; -import org.apache.kylin.storage.StorageContext; -import org.apache.kylin.storage.hbase.steps.RowValueDecoder; -import org.apache.kylin.storage.translate.HBaseKeyRange; - -import java.util.Iterator; -import java.util.List; -import java.util.Set; - -/** - */ -public class CubeSegmentTopNTupleIterator extends CubeSegmentTupleIterator{ - - private Iterator<Tuple> innerResultIterator; - - public CubeSegmentTopNTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, HConnection conn, // - Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, TblColRef topNCol, // - List<RowValueDecoder> rowValueDecoders, StorageContext context, TupleInfo returnTupleInfo) { - super(cubeSeg, keyRanges, conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo); - this.tupleConverter = new CubeTupleConverter(cubeSeg, cuboid, rowValueDecoders, returnTupleInfo, topNCol); - } - - @Override - public boolean hasNext() { - if (next != null) - return true; - - - if (innerResultIterator == null) { - if (resultIterator == null) { - if (rangeIterator.hasNext() == false) - return false; - - resultIterator = doScan(rangeIterator.next()); - } - - if (resultIterator.hasNext() == false) { - closeScanner(); - resultIterator = null; - innerResultIterator = null; - return hasNext(); - } - - Result result = resultIterator.next(); - scanCount++; - if (++scanCountDelta >= 1000) - flushScanCountDelta(); - innerResultIterator = tupleConverter.translateTopNResult(result, oneTuple); - } - - if (innerResultIterator.hasNext()) { - next = innerResultIterator.next(); - return true; - } else { - innerResultIterator = null; - return hasNext(); - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/e5ff988c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java index 4acd8f8..50cfefa 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java @@ -99,7 +99,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator { assert cuboid.equals(range.getCuboid()); } - this.tupleConverter = new CubeTupleConverter(cubeSeg, cuboid, rowValueDecoders, returnTupleInfo, null); + this.tupleConverter = new CubeTupleConverter(cubeSeg, cuboid, rowValueDecoders, returnTupleInfo); this.oneTuple = new Tuple(returnTupleInfo); this.rangeIterator = keyRanges.iterator(); http://git-wip-us.apache.org/repos/asf/kylin/blob/e5ff988c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java index 13c75de..49ea71d 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java @@ -48,6 +48,7 @@ import org.apache.kylin.cube.model.CubeDesc.DeriveInfo; import org.apache.kylin.cube.model.HBaseColumnDesc; import org.apache.kylin.cube.model.HBaseMappingDesc; import org.apache.kylin.dict.lookup.LookupStringTable; +import org.apache.kylin.measure.MeasureType; import org.apache.kylin.metadata.filter.ColumnTupleFilter; import org.apache.kylin.metadata.filter.CompareTupleFilter; import org.apache.kylin.metadata.filter.LogicalTupleFilter; @@ -96,9 +97,8 @@ public class CubeStorageQuery implements ICachableStorageQuery { @Override public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) { - - // check whether this is a TopN query - checkAndRewriteTopN(sqlDigest); + // allow custom measures hack + notifyBeforeStorageQuery(sqlDigest); Collection<TblColRef> groups = sqlDigest.groupbyColumns; TupleFilter filter = sqlDigest.filter; @@ -379,12 +379,8 @@ public class CubeStorageQuery implements ICachableStorageQuery { for (HBaseColumnDesc hbCol : hbCols) { bestHBCol = hbCol; bestIndex = hbCol.findMeasure(aggrFunc); - MeasureDesc measure = hbCol.getMeasures()[bestIndex]; - // criteria for holistic measure: Exact Aggregation && Exact Cuboid - if (measure.getFunction().isHolisticCountDistinct() && context.isExactAggregation()) { - logger.info("Holistic count distinct chosen for " + aggrFunc); - break; - } + // we used to prefer specific measure over another (holistic distinct count), now it's gone + break; } RowValueDecoder codec = codecMap.get(bestHBCol); @@ -717,7 +713,7 @@ public class CubeStorageQuery implements ICachableStorageQuery { } private void setThreshold(Collection<TblColRef> dimensions, List<RowValueDecoder> valueDecoders, StorageContext context) { - if (RowValueDecoder.hasMemHungryCountDistinct(valueDecoders) == false) { + if (RowValueDecoder.hasMemHungryMeasures(valueDecoders) == false) { return; } @@ -750,33 +746,11 @@ public class CubeStorageQuery implements ICachableStorageQuery { ObserverEnabler.enableCoprocessorIfBeneficial(cubeInstance, groupsCopD, valueDecoders, context); } - private void checkAndRewriteTopN(SQLDigest sqlDigest) { - FunctionDesc topnFunc = null; - TblColRef topnLiteralCol = null; + private void notifyBeforeStorageQuery(SQLDigest sqlDigest) { for (MeasureDesc measure : cubeDesc.getMeasures()) { - FunctionDesc func = measure.getFunction(); - if (func.isTopN() && sqlDigest.groupbyColumns.contains(func.getTopNLiteralColumn())) { - topnFunc = func; - topnLiteralCol = func.getTopNLiteralColumn(); - } - } - - // if TopN is not involved - if (topnFunc == null) - return; - - if (sqlDigest.aggregations.size() != 1) { - throw new IllegalStateException("When query with topN, only one metrics is allowed."); + MeasureType<?> measureType = measure.getFunction().getMeasureType(); + measureType.beforeStorageQuery(measure, sqlDigest); } - - FunctionDesc origFunc = sqlDigest.aggregations.iterator().next(); - if (origFunc.isSum() == false) { - throw new IllegalStateException("When query with topN, only SUM function is allowed."); - } - - sqlDigest.aggregations = Lists.newArrayList(topnFunc); - sqlDigest.groupbyColumns.remove(topnLiteralCol); - sqlDigest.metricColumns.add(topnLiteralCol); - logger.info("Rewrite function " + origFunc + " to " + topnFunc); } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/e5ff988c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java index 6be89d6..ab92ea9 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java @@ -2,18 +2,12 @@ package org.apache.kylin.storage.hbase.cube.v1; import java.io.IOException; import java.util.BitSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.apache.hadoop.hbase.client.Result; -import org.apache.kylin.common.topn.Counter; -import org.apache.kylin.common.topn.TopNCounter; import org.apache.kylin.common.util.Array; -import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; @@ -40,19 +34,14 @@ public class CubeTupleConverter { final int[] dimensionTupleIdx; final int[][] metricsMeasureIdx; final int[][] metricsTupleIdx; - final TblColRef topNCol; - int topNColTupleIdx; - int topNMeasureTupleIdx; - Dictionary<String> topNColDict; - public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, List<RowValueDecoder> rowValueDecoders, TupleInfo tupleInfo, TblColRef topNCol) { + public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, List<RowValueDecoder> rowValueDecoders, TupleInfo tupleInfo) { this.cubeSeg = cubeSeg; this.cuboid = cuboid; this.tupleInfo = tupleInfo; this.rowKeyDecoder = new RowKeyDecoder(this.cubeSeg); this.rowValueDecoders = rowValueDecoders; this.derivedColFillers = Lists.newArrayList(); - this.topNCol = topNCol; List<TblColRef> dimCols = cuboid.getColumns(); @@ -92,13 +81,6 @@ public class CubeTupleConverter { } } - if (this.topNCol != null) { - this.topNColTupleIdx = tupleInfo.hasColumn(this.topNCol) ? tupleInfo.getColumnIndex(this.topNCol) : -1; - this.topNMeasureTupleIdx = metricsTupleIdx[0][0]; - - this.topNColDict = (Dictionary<String>)cubeSeg.getDictionary(this.topNCol); - } - // prepare derived columns and filler Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCube().getHostToDerivedInfo(dimCols, null); for (Entry<Array<TblColRef>, List<DeriveInfo>> entry : hostToDerivedInfo.entrySet()) { @@ -112,46 +94,6 @@ public class CubeTupleConverter { } } - public Iterator<Tuple> translateTopNResult(Result hbaseRow, Tuple tuple) { - translateResult(hbaseRow, tuple); - Object topNCounterObj = tuple.getAllValues()[topNMeasureTupleIdx]; - assert (topNCounterObj instanceof TopNCounter); - return new TopNCounterTupleIterator(tuple, (TopNCounter) topNCounterObj); - } - - private class TopNCounterTupleIterator implements Iterator { - - private Tuple tuple; - private Iterator<Counter> topNCounterIterator; - private Counter<ByteArray> counter; - - private TopNCounterTupleIterator(Tuple tuple, TopNCounter topNCounter) { - this.tuple = tuple; - this.topNCounterIterator = topNCounter.iterator(); - } - - @Override - public boolean hasNext() { - return topNCounterIterator.hasNext(); - } - - @Override - public Tuple next() { - counter = topNCounterIterator.next(); - int key = BytesUtil.readUnsigned(counter.getItem().array(), 0, counter.getItem().array().length); - String colValue = topNColDict.getValueFromId(key); - tuple.setDimensionValue(topNColTupleIdx, colValue); - tuple.setMeasureValue(topNMeasureTupleIdx, counter.getCount()); - - return tuple; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - } - public void translateResult(Result hbaseRow, Tuple tuple) { try { byte[] rowkey = hbaseRow.getRow(); http://git-wip-us.apache.org/repos/asf/kylin/blob/e5ff988c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java index 3f92cb0..e8dd5b9 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.client.HConnection; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.metadata.filter.TupleFilter; -import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.tuple.ITuple; import org.apache.kylin.metadata.tuple.ITupleIterator; @@ -70,7 +69,8 @@ public class SerializedHBaseTupleIterator implements ITupleIterator { Map<CubeSegment, List<HBaseKeyRange>> rangesMap = makeRangesMap(segmentKeyRanges); for (Map.Entry<CubeSegment, List<HBaseKeyRange>> entry : rangesMap.entrySet()) { - this.segmentIteratorList.add(newCubeSegmentTupleIterator(entry.getKey(), entry.getValue(), conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo)); + CubeSegmentTupleIterator it = new CubeSegmentTupleIterator(entry.getKey(), entry.getValue(), conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo); + this.segmentIteratorList.add(it); } this.segmentIteratorIterator = this.segmentIteratorList.iterator(); @@ -81,16 +81,6 @@ public class SerializedHBaseTupleIterator implements ITupleIterator { } } - private CubeSegmentTupleIterator newCubeSegmentTupleIterator(CubeSegment seg, List<HBaseKeyRange> keyRange, HConnection conn, Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, StorageContext context2, TupleInfo returnTupleInfo) { - MeasureDesc topN = RowValueDecoder.findTopN(rowValueDecoders); - if (topN != null) { - TblColRef topNCol = topN.getFunction().getTopNLiteralColumn(); - return new CubeSegmentTopNTupleIterator(seg, keyRange, conn, dimensions, filter, groupBy, topNCol, rowValueDecoders, context, returnTupleInfo); - } else { - return new CubeSegmentTupleIterator(seg, keyRange, conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo); - } - } - private Map<CubeSegment, List<HBaseKeyRange>> makeRangesMap(List<HBaseKeyRange> segmentKeyRanges) { Map<CubeSegment, List<HBaseKeyRange>> map = Maps.newHashMap(); for (HBaseKeyRange range : segmentKeyRanges) { http://git-wip-us.apache.org/repos/asf/kylin/blob/e5ff988c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java index 521f111..5b16b04 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java @@ -32,6 +32,9 @@ import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.model.HBaseColumnDesc; import org.apache.kylin.measure.MeasureAggregator; import org.apache.kylin.measure.MeasureCodec; +import org.apache.kylin.measure.MeasureType; +import org.apache.kylin.measure.MeasureTypeFactory; +import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorConstants; import org.apache.kylin.storage.hbase.steps.RowValueDecoder; @@ -122,6 +125,8 @@ public class ObserverAggregators { final int nHCols; final ByteBuffer[] hColValues; final int nTotalMeasures; + + MeasureType measureTypes[]; public ObserverAggregators(HCol[] _hcols) { this.hcols = sort(_hcols); @@ -150,11 +155,18 @@ public class ObserverAggregators { } public MeasureAggregator[] createBuffer() { + if (measureTypes == null) { + measureTypes = new MeasureType[nTotalMeasures]; + int i = 0; + for (HCol col : hcols) { + for (int j = 0; j < col.nMeasures; j++) + measureTypes[i++] = MeasureTypeFactory.create(col.funcNames[j], DataType.getType(col.dataTypes[j])); + } + } + MeasureAggregator[] aggrs = new MeasureAggregator[nTotalMeasures]; - int i = 0; - for (HCol col : hcols) { - for (int j = 0; j < col.nMeasures; j++) - aggrs[i++] = MeasureAggregator.create(col.funcNames[j], col.dataTypes[j]); + for (int i = 0; i < nTotalMeasures; i++) { + aggrs[i] = measureTypes[i].newAggregator(); } return aggrs; } http://git-wip-us.apache.org/repos/asf/kylin/blob/e5ff988c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/FuzzyRowFilterV2.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/FuzzyRowFilterV2.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/FuzzyRowFilterV2.java index db05a40..ea33f9a 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/FuzzyRowFilterV2.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/FuzzyRowFilterV2.java @@ -80,7 +80,7 @@ public class FuzzyRowFilterV2 extends FilterBase { for (int i = 0; i < fuzzyKeysData.size(); i++) { p = fuzzyKeysData.get(i); if (p.getFirst().length != p.getSecond().length) { - Pair<String, String> readable = new Pair<String, String>(Bytes.toStringBinary(p.getFirst()), Bytes.toStringBinary(p.getSecond())); + Pair<String, String> readable = Pair.newPair(Bytes.toStringBinary(p.getFirst()), Bytes.toStringBinary(p.getSecond())); throw new IllegalArgumentException("Fuzzy pair lengths do not match: " + readable); } // update mask ( 0 -> -1 (0xff), 1 -> 0) @@ -225,7 +225,7 @@ public class FuzzyRowFilterV2 extends FilterBase { void updateWith(Cell currentCell, Pair<byte[], byte[]> fuzzyData) { byte[] nextRowKeyCandidate = getNextForFuzzyRule(isReversed(), currentCell.getRowArray(), currentCell.getRowOffset(), currentCell.getRowLength(), fuzzyData.getFirst(), fuzzyData.getSecond()); if (nextRowKeyCandidate != null) { - nextRows.add(new Pair<byte[], Pair<byte[], byte[]>>(nextRowKeyCandidate, fuzzyData)); + nextRows.add(Pair.newPair(nextRowKeyCandidate, fuzzyData)); } } @@ -263,7 +263,7 @@ public class FuzzyRowFilterV2 extends FilterBase { FilterProtosExt.BytesBytesPair current = proto.getFuzzyKeysData(i); byte[] keyBytes = current.getFirst().toByteArray(); byte[] keyMeta = current.getSecond().toByteArray(); - fuzzyKeysData.add(new Pair<byte[], byte[]>(keyBytes, keyMeta)); + fuzzyKeysData.add(Pair.newPair(keyBytes, keyMeta)); } return new FuzzyRowFilterV2(fuzzyKeysData); } http://git-wip-us.apache.org/repos/asf/kylin/blob/e5ff988c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java index 412e7602..6a6c887 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java @@ -134,7 +134,7 @@ public abstract class CubeHBaseRPC { fuzzyKeyEncoder.encode(gtRecordFuzzyKey, gtRecordFuzzyKey.getInfo().getPrimaryKey(), hbaseFuzzyKey); fuzzyMaskEncoder.encode(gtRecordFuzzyKey, gtRecordFuzzyKey.getInfo().getPrimaryKey(), hbaseFuzzyMask); - ret.add(new Pair<byte[], byte[]>(hbaseFuzzyKey, hbaseFuzzyMask)); + ret.add(Pair.newPair(hbaseFuzzyKey, hbaseFuzzyMask)); } return ret; @@ -153,7 +153,7 @@ public abstract class CubeHBaseRPC { for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) { if (selectedColBlocks.get(colBlkIndex)) { byte[] byteQualifier = Bytes.toBytes(hbaseColDesc.getQualifier()); - result.add(new Pair<byte[], byte[]>(byteFamily, byteQualifier)); + result.add(Pair.newPair(byteFamily, byteQualifier)); } colBlkIndex++; } http://git-wip-us.apache.org/repos/asf/kylin/blob/e5ff988c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java index 3dbbce1..1ebf0d8 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java @@ -54,7 +54,7 @@ public class CubeStorageQuery implements ICachableStorageQuery { @Override public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) { - // check whether this is a TopN query + // allow custom measures hack notifyBeforeStorageQuery(sqlDigest); Collection<TblColRef> groups = sqlDigest.groupbyColumns; @@ -88,10 +88,6 @@ public class CubeStorageQuery implements ICachableStorageQuery { boolean isExactAggregation = isExactAggregation(cuboid, groups, filterDimsD, singleValuesD, derivedPostAggregation); context.setExactAggregation(isExactAggregation); - if (isExactAggregation) { - metrics = replaceHolisticCountDistinct(metrics); - } - // replace derived columns in filter with host columns; columns on loosened condition must be added to group by TupleFilter filterD = translateDerived(filter, groupsD); @@ -113,22 +109,7 @@ public class CubeStorageQuery implements ICachableStorageQuery { if (scanners.isEmpty()) return ITupleIterator.EMPTY_TUPLE_ITERATOR; - return newSequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context); - } - - private ITupleIterator newSequentialCubeTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid, Set<TblColRef> dimensionsD, Set<FunctionDesc> metrics, TupleInfo returnTupleInfo, StorageContext context) { - TblColRef topNCol = null; - for (FunctionDesc func : metrics) { - if (func.isTopN()) { - topNCol = func.getTopNLiteralColumn(); - break; - } - } - - if (topNCol != null) - return new SequentialCubeTopNTupleIterator(scanners, cuboid, dimensionsD, topNCol, metrics, returnTupleInfo, context); - else - return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context); + return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context); } private void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) { @@ -250,27 +231,6 @@ public class CubeStorageQuery implements ICachableStorageQuery { return exact; } - private Set<FunctionDesc> replaceHolisticCountDistinct(Set<FunctionDesc> metrics) { - // for count distinct, try use its holistic version if possible - Set<FunctionDesc> result = new LinkedHashSet<FunctionDesc>(); - for (FunctionDesc metric : metrics) { - if (metric.isCountDistinct() == false) { - result.add(metric); - continue; - } - - FunctionDesc holisticVersion = null; - for (MeasureDesc measure : cubeDesc.getMeasures()) { - FunctionDesc measureFunc = measure.getFunction(); - if (measureFunc.equals(metric) && measureFunc.isHolisticCountDistinct()) { - holisticVersion = measureFunc; - } - } - result.add(holisticVersion == null ? metric : holisticVersion); - } - return result; - } - @SuppressWarnings("unchecked") private TupleFilter translateDerived(TupleFilter filter, Set<TblColRef> collector) { if (filter == null) @@ -345,15 +305,13 @@ public class CubeStorageQuery implements ICachableStorageQuery { } private void setThreshold(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, StorageContext context) { - boolean hasMemHungryCountDistinct = false; + boolean hasMemHungryMeasure = false; for (FunctionDesc func : metrics) { - if (func.isCountDistinct() && !func.isHolisticCountDistinct()) { - hasMemHungryCountDistinct = true; - } + hasMemHungryMeasure |= func.getMeasureType().isMemoryHungry(); } - // need to limit the memory usage for memory hungry count distinct - if (hasMemHungryCountDistinct == false) { + // need to limit the memory usage for memory hungry measures + if (hasMemHungryMeasure == false) { return; } @@ -377,6 +335,13 @@ public class CubeStorageQuery implements ICachableStorageQuery { } } + private void notifyBeforeStorageQuery(SQLDigest sqlDigest) { + for (MeasureDesc measure : cubeDesc.getMeasures()) { + MeasureType<?> measureType = measure.getFunction().getMeasureType(); + measureType.beforeStorageQuery(measure, sqlDigest); + } + } + // ============================================================================ @Override @@ -394,10 +359,4 @@ public class CubeStorageQuery implements ICachableStorageQuery { return false; } - private void notifyBeforeStorageQuery(SQLDigest sqlDigest) { - for (MeasureDesc measure : cubeDesc.getMeasures()) { - MeasureType measureType = measure.getFunction().getMeasureType(); - measureType.beforeStorageQuery(measure, sqlDigest); - } - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/e5ff988c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java index a9e5a65..0ec382a 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java @@ -18,17 +18,12 @@ package org.apache.kylin.storage.hbase.cube.v2; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import org.apache.kylin.common.topn.Counter; -import org.apache.kylin.common.topn.TopNCounter; import org.apache.kylin.common.util.Array; -import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -60,24 +55,19 @@ public class CubeTupleConverter { final int[] gtColIdx; final int[] tupleIdx; final Object[] gtValues; - final MeasureType[] measureTypes; + final MeasureType<?>[] measureTypes; final List<IAdvMeasureFiller> advMeasureFillers; final List<Integer> advMeasureIndexInGTValues; final int nSelectedDims; - final TblColRef topNCol; - int topNColTupleIdx; - int topNMeasureTupleIdx; - Dictionary<String> topNColDict; public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, // - Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, TblColRef topNCol) { + Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo) { this.cubeSeg = cubeSeg; this.cuboid = cuboid; this.tupleInfo = returnTupleInfo; this.derivedColFillers = Lists.newArrayList(); - this.topNCol = topNCol; List<TblColRef> cuboidDims = cuboid.getColumns(); CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping(); @@ -117,7 +107,7 @@ public class CubeTupleConverter { tupleIdx[iii] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1; } - MeasureType measureType = metric.getMeasureType(); + MeasureType<?> measureType = metric.getMeasureType(); if (measureType.needAdvancedTupleFilling()) { Map<TblColRef, Dictionary<String>> dictionaryMap = buildDictionaryMap(measureType.getColumnsNeedDictionary(metric)); advMeasureFillers.add(measureType.getAdvancedTupleFiller(metric, returnTupleInfo, dictionaryMap)); @@ -129,14 +119,6 @@ public class CubeTupleConverter { iii++; } - if (this.topNCol != null) { - this.topNColTupleIdx = tupleInfo.hasColumn(this.topNCol) ? tupleInfo.getColumnIndex(this.topNCol) : -1; - // topN only allow 1 measure - this.topNMeasureTupleIdx = tupleIdx[tupleIdx.length - 1]; - - this.topNColDict = (Dictionary<String>)cubeSeg.getDictionary(this.topNCol); - } - // prepare derived columns and filler Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCube().getHostToDerivedInfo(cuboidDims, null); for (Entry<Array<TblColRef>, List<DeriveInfo>> entry : hostToDerivedInfo.entrySet()) { @@ -196,13 +178,6 @@ public class CubeTupleConverter { } } - public Iterator<Tuple> translateTopNResult(GTRecord record, Tuple tuple) { - translateResult(record, tuple); - Object topNCounterObj = tuple.getAllValues()[topNMeasureTupleIdx]; - assert (topNCounterObj instanceof TopNCounter); - return new TopNCounterTupleIterator(tuple, (TopNCounter) topNCounterObj); - } - private interface IDerivedColumnFiller { public void fillDerivedColumns(Object[] gtValues, Tuple tuple); } @@ -292,38 +267,4 @@ public class CubeTupleConverter { private static String toString(Object o) { return o == null ? null : o.toString(); } - - private class TopNCounterTupleIterator implements Iterator { - - private Tuple tuple; - private Iterator<Counter> topNCounterIterator; - private Counter<ByteArray> counter; - - private TopNCounterTupleIterator(Tuple tuple, TopNCounter topNCounter) { - this.tuple = tuple; - this.topNCounterIterator = topNCounter.iterator(); - } - - @Override - public boolean hasNext() { - return topNCounterIterator.hasNext(); - } - - @Override - public Tuple next() { - counter = topNCounterIterator.next(); - int key = BytesUtil.readUnsigned(counter.getItem().array(), 0, counter.getItem().array().length); - String colValue = topNColDict.getValueFromId(key); - tuple.setDimensionValue(topNColTupleIdx, colValue); - tuple.setMeasureValue(topNMeasureTupleIdx, counter.getCount()); - - return tuple; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/e5ff988c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java index 303c360..ff7498b 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java @@ -26,7 +26,6 @@ import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.Pair; -import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRequest; http://git-wip-us.apache.org/repos/asf/kylin/blob/e5ff988c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTopNTupleIterator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTopNTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTopNTupleIterator.java deleted file mode 100644 index 9eeae4e..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTopNTupleIterator.java +++ /dev/null @@ -1,68 +0,0 @@ -package org.apache.kylin.storage.hbase.cube.v2; - -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.tuple.Tuple; -import org.apache.kylin.metadata.tuple.TupleInfo; -import org.apache.kylin.storage.StorageContext; - -import java.util.Iterator; -import java.util.List; -import java.util.Set; - -/** - * Created by shaoshi on 10/28/15. - */ -public class SequentialCubeTopNTupleIterator extends SequentialCubeTupleIterator { - - private Iterator<Tuple> innerResultIterator; - private TblColRef topNCol; - - public SequentialCubeTopNTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid, Set<TblColRef> selectedDimensions, // - TblColRef topNCol, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context) { - - super(scanners, cuboid, selectedDimensions, selectedMetrics, returnTupleInfo, context); - this.topNCol = topNCol; - } - - @Override - public boolean hasNext() { - if (next != null) - return true; - if (innerResultIterator == null) { - if (curScanner == null) { - if (scannerIterator.hasNext()) { - curScanner = scannerIterator.next(); - curRecordIterator = curScanner.iterator(); - curTupleConverter = new CubeTupleConverter(curScanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo, topNCol); - } else { - return false; - } - } - - if (curRecordIterator.hasNext()) { - innerResultIterator = curTupleConverter.translateTopNResult(curRecordIterator.next(), tuple); - return hasNext(); - } else { - close(curScanner); - curScanner = null; - curRecordIterator = null; - curTupleConverter = null; - innerResultIterator = null; - return hasNext(); - } - - } - - - if (innerResultIterator.hasNext()) { - next = innerResultIterator.next(); - return true; - } else { - innerResultIterator = null; - return hasNext(); - } - - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/e5ff988c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java index 5bf5e95..7812baf 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java @@ -75,7 +75,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator { if (scannerIterator.hasNext()) { curScanner = scannerIterator.next(); curRecordIterator = curScanner.iterator(); - curTupleConverter = new CubeTupleConverter(curScanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo, null); + curTupleConverter = new CubeTupleConverter(curScanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo); } else { return false; } http://git-wip-us.apache.org/repos/asf/kylin/blob/e5ff988c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java index 036d9b8..10e80ae 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java @@ -30,6 +30,7 @@ import org.apache.kylin.invertedindex.index.TableRecordInfo; import org.apache.kylin.invertedindex.index.TableRecordInfoDigest; import org.apache.kylin.invertedindex.measure.FixedLenMeasureCodec; import org.apache.kylin.measure.MeasureAggregator; +import org.apache.kylin.measure.hllc.HLLCMeasureType; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.datatype.LongMutable; import org.apache.kylin.metadata.model.FunctionDesc; @@ -78,7 +79,7 @@ public class EndpointAggregators { } else { int index = tableInfo.findFactTableColumn(functionDesc.getParameter().getValue()); Preconditions.checkState(index >= 0, "Column " + functionDesc.getParameter().getValue() + " is not found in II"); - if (functionDesc.isCountDistinct()) { + if (HLLCMeasureType.isCountDistinct(functionDesc)) { return new MetricInfo(MetricType.DistinctCount, index, functionDesc.getReturnDataType().getPrecision()); } else { return new MetricInfo(MetricType.Normal, index); @@ -144,10 +145,10 @@ public class EndpointAggregators { MeasureAggregator[] aggrs = new MeasureAggregator[funcNames.length]; for (int i = 0; i < aggrs.length; i++) { if (metricInfos[i].type == MetricType.DistinctCount) { - aggrs[i] = MeasureAggregator.create(funcNames[i], dataTypes[i]); + aggrs[i] = MeasureAggregator.create(funcNames[i], DataType.getType(dataTypes[i])); } else { //all other fixed length measures can be aggregated as long - aggrs[i] = MeasureAggregator.create(funcNames[i], "long"); + aggrs[i] = MeasureAggregator.create(funcNames[i], DataType.getType("long")); } } return aggrs; http://git-wip-us.apache.org/repos/asf/kylin/blob/e5ff988c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java index e638386..4ec421b 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java @@ -42,6 +42,7 @@ import org.apache.kylin.common.util.RangeUtil; import org.apache.kylin.invertedindex.IISegment; import org.apache.kylin.invertedindex.index.TableRecord; import org.apache.kylin.invertedindex.index.TableRecordInfo; +import org.apache.kylin.measure.hllc.HLLCMeasureType; import org.apache.kylin.metadata.filter.ConstantTupleFilter; import org.apache.kylin.metadata.filter.TupleFilter; import org.apache.kylin.metadata.model.FunctionDesc; @@ -205,7 +206,7 @@ public class EndpointTupleIterator implements ITupleIterator { boolean updated = false; for (TblColRef column : columns) { if (column.isSameAs(factTableName, functionDesc.getParameter().getValue())) { - if (functionDesc.isCountDistinct()) { + if (HLLCMeasureType.isCountDistinct(functionDesc)) { //TODO: default precision might need be configurable String iiDefaultHLLC = "hllc10"; functionDesc.setReturnType(iiDefaultHLLC); http://git-wip-us.apache.org/repos/asf/kylin/blob/e5ff988c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java index a7d0776..c798289 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java @@ -374,12 +374,7 @@ public class CreateHTableJob extends AbstractHadoopJob { int space = 0; for (MeasureDesc measureDesc : cubeSegment.getCubeDesc().getMeasures()) { DataType returnType = measureDesc.getFunction().getReturnDataType(); - if (returnType.isHLLC()) { - // for HLL, it will be compressed when export to bytes - space += returnType.getStorageBytesEstimate() * 0.75; - } else { - space += returnType.getStorageBytesEstimate(); - } + space += returnType.getStorageBytesEstimate(); } bytesLength += space; http://git-wip-us.apache.org/repos/asf/kylin/blob/e5ff988c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java index f99ddb5..ecbf07a 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java @@ -152,7 +152,7 @@ public class HBaseMROutput2 implements IMROutput2 { this.parsedKey = new ByteArrayWritable(); this.parsedValue = new Object[measuresDescs.size()]; - this.parsedPair = new Pair<ByteArrayWritable, Object[]>(parsedKey, parsedValue); + this.parsedPair = Pair.newPair(parsedKey, parsedValue); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/e5ff988c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java index 3720123..e1b6b18 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java @@ -241,7 +241,7 @@ public class HBaseMROutput2Transition implements IMROutput2 { if (parsedPair == null) { parsedKey = new ByteArrayWritable(); parsedValue = new Object[seg.getCubeDesc().getMeasures().size()]; - parsedPair = new Pair<ByteArrayWritable, Object[]>(parsedKey, parsedValue); + parsedPair = Pair.newPair(parsedKey, parsedValue); } // merge by cuboid files http://git-wip-us.apache.org/repos/asf/kylin/blob/e5ff988c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java index 617af76..59a5fed 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java @@ -122,34 +122,21 @@ public class RowValueDecoder implements Cloneable { } } - public boolean hasMemHungryCountDistinct() { + public boolean hasMemHungryMeasures() { for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) { FunctionDesc func = measures[i].getFunction(); - if (func.isCountDistinct() && !func.isHolisticCountDistinct()) { + if (func.getMeasureType().isMemoryHungry()) return true; - } } return false; } - public static boolean hasMemHungryCountDistinct(Collection<RowValueDecoder> rowValueDecoders) { + public static boolean hasMemHungryMeasures(Collection<RowValueDecoder> rowValueDecoders) { for (RowValueDecoder decoder : rowValueDecoders) { - if (decoder.hasMemHungryCountDistinct()) + if (decoder.hasMemHungryMeasures()) return true; } return false; } - public static MeasureDesc findTopN(Collection<RowValueDecoder> rowValueDecoders) { - for (RowValueDecoder decoder : rowValueDecoders) { - for (int i = decoder.projectionIndex.nextSetBit(0); i >= 0; i = decoder.projectionIndex.nextSetBit(i + 1)) { - MeasureDesc measure = decoder.measures[i]; - FunctionDesc func = measure.getFunction(); - if (func.isTopN()) - return measure; - } - } - return null; - } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/e5ff988c/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java index e71089a..8a88b6d 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java @@ -325,7 +325,7 @@ public class GridTableHBaseBenchmark { int logicRows = nRows / nColumns; for (int i = 0; i < nColumns; i++) { if (rand.nextDouble() < hitRatio) { - hitsForColumnScan.add(new Pair<Integer, Integer>(i * logicRows, (i + 1) * logicRows)); + hitsForColumnScan.add(Pair.newPair(i * logicRows, (i + 1) * logicRows)); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/e5ff988c/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/filter/TestFuzzyRowFilterV2EndToEnd.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/filter/TestFuzzyRowFilterV2EndToEnd.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/filter/TestFuzzyRowFilterV2EndToEnd.java index f717d82..1d85922 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/filter/TestFuzzyRowFilterV2EndToEnd.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/filter/TestFuzzyRowFilterV2EndToEnd.java @@ -167,7 +167,7 @@ public class TestFuzzyRowFilterV2EndToEnd { } buf.putInt(i); - Pair<byte[], byte[]> pair = new Pair<byte[], byte[]>(fuzzyKey, mask); + Pair<byte[], byte[]> pair = Pair.newPair(fuzzyKey, mask); list.add(pair); } @@ -200,7 +200,7 @@ public class TestFuzzyRowFilterV2EndToEnd { } buf.putInt(i * 2); - Pair<byte[], byte[]> pair = new Pair<byte[], byte[]>(fuzzyKey, mask); + Pair<byte[], byte[]> pair = Pair.newPair(fuzzyKey, mask); list.add(pair); } @@ -309,8 +309,8 @@ public class TestFuzzyRowFilterV2EndToEnd { byte[] mask2 = new byte[] { 0, 0, 0, 0, 0, 0, 1, 1, 1, 1 }; - Pair<byte[], byte[]> pair1 = new Pair<byte[], byte[]>(fuzzyKey1, mask1); - Pair<byte[], byte[]> pair2 = new Pair<byte[], byte[]>(fuzzyKey2, mask2); + Pair<byte[], byte[]> pair1 = Pair.newPair(fuzzyKey1, mask1); + Pair<byte[], byte[]> pair2 = Pair.newPair(fuzzyKey2, mask2); FuzzyRowFilterV2 fuzzyRowFilter1 = new FuzzyRowFilterV2(Lists.newArrayList(pair1)); FuzzyRowFilterV2 fuzzyRowFilter2 = new FuzzyRowFilterV2(Lists.newArrayList(pair2));