KYLIN-2332 Refactor TupleConverter a bit
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/13c34079 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/13c34079 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/13c34079 Branch: refs/heads/master-cdh5.7 Commit: 13c34079daa0e102b817d15ccfe2d70b8b364c34 Parents: 687d5fd Author: Li Yang <liy...@apache.org> Authored: Thu Dec 29 12:18:45 2016 +0800 Committer: Li Yang <liy...@apache.org> Committed: Thu Dec 29 13:38:19 2016 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/storage/IStorageQuery.java | 3 - .../apache/kylin/storage/StorageContext.java | 10 ++++ .../storage/gtrecord/CubeSegmentScanner.java | 3 +- .../storage/gtrecord/CubeTupleConverter.java | 60 +------------------- .../gtrecord/GTCubeStorageQueryBase.java | 7 ++- .../kylin/storage/gtrecord/ITupleConverter.java | 30 ++++++++++ .../gtrecord/SegmentCubeTupleIterator.java | 8 +-- .../gtrecord/SequentialCubeTupleIterator.java | 55 +++++++++++++++++- .../storage/hbase/cube/v2/CubeStorageQuery.java | 1 + 9 files changed, 108 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/13c34079/core-storage/src/main/java/org/apache/kylin/storage/IStorageQuery.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/IStorageQuery.java b/core-storage/src/main/java/org/apache/kylin/storage/IStorageQuery.java index 6b53b5b..5455cc3 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/IStorageQuery.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/IStorageQuery.java @@ -23,9 +23,6 @@ import org.apache.kylin.metadata.tuple.ITupleIterator; import org.apache.kylin.metadata.tuple.TupleInfo; /** - * - * @author xjiang - * */ public interface IStorageQuery { http://git-wip-us.apache.org/repos/asf/kylin/blob/13c34079/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java index 9ef59fd..ec46f83 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java @@ -47,6 +47,7 @@ public class StorageContext { private boolean limitEnabled = false; private boolean enableCoprocessor = false; + private IStorageQuery storageQuery; private AtomicLong totalScanCount = new AtomicLong(); private Cuboid cuboid; private boolean partialResultReturned = false; @@ -193,4 +194,13 @@ public class StorageContext { public void setReusedPeriod(Range<Long> reusedPeriod) { this.reusedPeriod = reusedPeriod; } + + public IStorageQuery getStorageQuery() { + return storageQuery; + } + + public void setStorageQuery(IStorageQuery storageQuery) { + this.storageQuery = storageQuery; + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/13c34079/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java index c6a6daa..9d6f946 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java @@ -52,7 +52,7 @@ public class CubeSegmentScanner implements IGTScanner { final GTScanRequest scanRequest; public CubeSegmentScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, // - Collection<FunctionDesc> metrics, TupleFilter originalfilter, StorageContext context, String gtStorage) { + Collection<FunctionDesc> metrics, TupleFilter originalfilter, StorageContext context) { logger.info("Init CubeSegmentScanner for segment {}", cubeSeg.getName()); @@ -78,6 +78,7 @@ public class CubeSegmentScanner implements IGTScanner { throw new RuntimeException(e); } scanRequest = scanRangePlanner.planScanRequest(); + String gtStorage = ((GTCubeStorageQueryBase) context.getStorageQuery()).getGTStorage(); scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage); } http://git-wip-us.apache.org/repos/asf/kylin/blob/13c34079/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java index 3159318..280718f 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java @@ -18,11 +18,10 @@ package org.apache.kylin.storage.gtrecord; -import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.Map.Entry; +import java.util.Set; import org.apache.kylin.common.util.Array; import org.apache.kylin.common.util.Dictionary; @@ -37,18 +36,16 @@ import org.apache.kylin.measure.MeasureType; import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.tuple.ITuple; import org.apache.kylin.metadata.tuple.Tuple; import org.apache.kylin.metadata.tuple.TupleInfo; -import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; /** * convert GTRecord to tuple */ -public class CubeTupleConverter { +public class CubeTupleConverter implements ITupleConverter { final CubeSegment cubeSeg; final Cuboid cuboid; @@ -65,8 +62,6 @@ public class CubeTupleConverter { private final int nSelectedDims; - private final int[] dimensionIndexOnTuple; - public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, // Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo) { this.cubeSeg = cubeSeg; @@ -88,18 +83,6 @@ public class CubeTupleConverter { advMeasureFillers = Lists.newArrayListWithCapacity(1); advMeasureIndexInGTValues = Lists.newArrayListWithCapacity(1); - // dimensionIndexOnTuple is for SQL with limit - List<Integer> temp = Lists.newArrayList(); - for (TblColRef dim : cuboid.getColumns()) { - if (tupleInfo.hasColumn(dim)) { - temp.add(tupleInfo.getColumnIndex(dim)); - } - } - dimensionIndexOnTuple = new int[temp.size()]; - for (int i = 0; i < temp.size(); i++) { - dimensionIndexOnTuple[i] = temp.get(i); - } - //////////// int i = 0; @@ -155,44 +138,6 @@ public class CubeTupleConverter { } } - public Comparator<ITuple> getTupleDimensionComparator() { - return new Comparator<ITuple>() { - @Override - public int compare(ITuple o1, ITuple o2) { - Preconditions.checkNotNull(o1); - Preconditions.checkNotNull(o2); - for (int i = 0; i < dimensionIndexOnTuple.length; i++) { - int index = dimensionIndexOnTuple[i]; - - if (index == -1) { - //TODO: - continue; - } - - Comparable a = (Comparable) o1.getAllValues()[index]; - Comparable b = (Comparable) o2.getAllValues()[index]; - - if (a == null && b == null) { - continue; - } else if (a == null) { - return 1; - } else if (b == null) { - return -1; - } else { - int temp = a.compareTo(b); - if (temp != 0) { - return temp; - } else { - continue; - } - } - } - - return 0; - } - }; - } - // load only needed dictionaries private Map<TblColRef, Dictionary<String>> buildDictionaryMap(List<TblColRef> columnsNeedDictionary) { Map<TblColRef, Dictionary<String>> result = Maps.newHashMap(); @@ -202,6 +147,7 @@ public class CubeTupleConverter { return result; } + @Override public List<IAdvMeasureFiller> translateResult(GTRecord record, Tuple tuple) { record.getValues(gtColIdx, gtValues); http://git-wip-us.apache.org/repos/asf/kylin/blob/13c34079/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java index 7b0cbb9..4fcfad1 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@ -72,6 +72,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { @Override public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) { + context.setStorageQuery(this); //deal with participant columns in subquery join sqlDigest.includeSubqueryJoinParticipants(); @@ -131,7 +132,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { continue; } - scanner = new CubeSegmentScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD, context, getGTStorage()); + scanner = new CubeSegmentScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD, context); scanners.add(scanner); } @@ -142,6 +143,10 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { } protected abstract String getGTStorage(); + + protected ITupleConverter newCubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo tupleInfo) { + return new CubeTupleConverter(cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo); + } private void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) { for (FunctionDesc func : sqlDigest.aggregations) { http://git-wip-us.apache.org/repos/asf/kylin/blob/13c34079/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java new file mode 100644 index 0000000..9c50d0c --- /dev/null +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.gtrecord; + +import java.util.List; + +import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller; +import org.apache.kylin.metadata.tuple.Tuple; + +public interface ITupleConverter { + + public List<IAdvMeasureFiller> translateResult(GTRecord record, Tuple tuple); +} http://git-wip-us.apache.org/repos/asf/kylin/blob/13c34079/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java index 00ba247..37699a3 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java @@ -50,7 +50,7 @@ public class SegmentCubeTupleIterator implements ITupleIterator { protected final StorageContext context; protected Iterator<GTRecord> gtItr; - protected CubeTupleConverter cubeTupleConverter; + protected ITupleConverter cubeTupleConverter; protected Tuple next; private List<IAdvMeasureFiller> advMeasureFillers; @@ -67,7 +67,7 @@ public class SegmentCubeTupleIterator implements ITupleIterator { this.tuple = new Tuple(returnTupleInfo); this.context = context; this.gtItr = getGTItr(scanner); - this.cubeTupleConverter = new CubeTupleConverter(scanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo); + this.cubeTupleConverter = ((GTCubeStorageQueryBase) context.getStorageQuery()).newCubeTupleConverter(scanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo); } private Iterator<GTRecord> getGTItr(CubeSegmentScanner scanner) { @@ -151,8 +151,4 @@ public class SegmentCubeTupleIterator implements ITupleIterator { logger.error("Exception when close CubeScanner", e); } } - - public CubeTupleConverter getCubeTupleConverter() { - return cubeTupleConverter; - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/13c34079/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java index 3a64de7..ee868c7 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java @@ -19,6 +19,7 @@ package org.apache.kylin.storage.gtrecord; import java.io.IOException; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Set; @@ -36,6 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Function; +import com.google.common.base.Preconditions; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; @@ -73,9 +75,60 @@ public class SequentialCubeTupleIterator implements ITupleIterator { return input; } }); - tupleIterator = new SortedIteratorMergerWithLimit<ITuple>(transformed, context.getFinalPushDownLimit(), segmentCubeTupleIterators.get(0).getCubeTupleConverter().getTupleDimensionComparator()).getIterator(); + tupleIterator = new SortedIteratorMergerWithLimit<ITuple>(transformed, context.getFinalPushDownLimit(), getTupleDimensionComparator(cuboid, returnTupleInfo)).getIterator(); } } + + public Comparator<ITuple> getTupleDimensionComparator(Cuboid cuboid, TupleInfo returnTupleInfo) { + // dimensionIndexOnTuple is for SQL with limit + List<Integer> temp = Lists.newArrayList(); + for (TblColRef dim : cuboid.getColumns()) { + if (returnTupleInfo.hasColumn(dim)) { + temp.add(returnTupleInfo.getColumnIndex(dim)); + } + } + + final int[] dimensionIndexOnTuple = new int[temp.size()]; + for (int i = 0; i < temp.size(); i++) { + dimensionIndexOnTuple[i] = temp.get(i); + } + + return new Comparator<ITuple>() { + @Override + public int compare(ITuple o1, ITuple o2) { + Preconditions.checkNotNull(o1); + Preconditions.checkNotNull(o2); + for (int i = 0; i < dimensionIndexOnTuple.length; i++) { + int index = dimensionIndexOnTuple[i]; + + if (index == -1) { + //TODO: + continue; + } + + Comparable a = (Comparable) o1.getAllValues()[index]; + Comparable b = (Comparable) o2.getAllValues()[index]; + + if (a == null && b == null) { + continue; + } else if (a == null) { + return 1; + } else if (b == null) { + return -1; + } else { + int temp = a.compareTo(b); + if (temp != 0) { + return temp; + } else { + continue; + } + } + } + + return 0; + } + }; + } @Override public boolean hasNext() { http://git-wip-us.apache.org/repos/asf/kylin/blob/13c34079/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java index f9c9a2b..fe483ba 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java @@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory; public class CubeStorageQuery extends GTCubeStorageQueryBase { + @SuppressWarnings("unused") private static final Logger logger = LoggerFactory.getLogger(CubeStorageQuery.class); public CubeStorageQuery(CubeInstance cube) {