refactor
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b26b2489 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b26b2489 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b26b2489 Branch: refs/heads/master Commit: b26b2489baddcfd148eaf8f17330878bbf349048 Parents: aaf3b87 Author: Hongbin Ma <[email protected]> Authored: Mon Apr 11 17:33:07 2016 +0800 Committer: Hongbin Ma <[email protected]> Committed: Wed Apr 13 11:11:15 2016 +0800 ---------------------------------------------------------------------- .../kylin/job/DeployLocalMetaToRemoteTest.java | 4 +- .../java/org/apache/kylin/job/DeployUtil.java | 4 +- .../org/apache/kylin/common/util/ByteArray.java | 11 +- .../kylin/common/util/Log4jConfigurer.java | 2 + .../common/util/AbstractKylinTestCase.java | 14 +- .../common/util/HBaseMetadataTestCase.java | 4 +- .../common/util/LocalFileMetadataTestCase.java | 8 +- .../org/apache/kylin/cube/cuboid/Cuboid.java | 18 ++ .../apache/kylin/cube/kv/LazyRowKeyEncoder.java | 25 -- .../org/apache/kylin/cube/model/CubeDesc.java | 5 + .../org/apache/kylin/gridtable/GTRecord.java | 10 +- .../org/apache/kylin/gridtable/GTScanRange.java | 2 - .../kylin/gridtable/GTScanRangePlanner.java | 294 ++++++++++++++++--- .../apache/kylin/gridtable/GTScanRequest.java | 68 +++-- .../org/apache/kylin/gridtable/IGTStorage.java | 27 ++ .../apache/kylin/gridtable/ScannerWorker.java | 72 +++++ .../kylin/cube/AggregationGroupRuleTest.java | 19 +- .../apache/kylin/cube/RowKeyAttrRuleTest.java | 5 +- .../cube/inmemcubing/DoggedCubeBuilderTest.java | 160 ---------- .../cube/inmemcubing/InMemCubeBuilderTest.java | 268 ----------------- .../kylin/gridtable/DictGridTableTest.java | 58 ++-- .../impl/threadpool/DefaultSchedulerTest.java | 151 ---------- .../filter/UDF/MassInValueProviderFactory.java | 1 - .../apache/kylin/storage/StorageMockUtils.java | 189 ++++++++++++ .../kylin/storage/cache/StorageMockUtils.java | 157 ---------- kylin-it/pom.xml | 7 + .../ITDoggedCubeBuilderStressTest.java | 8 +- .../inmemcubing/ITDoggedCubeBuilderTest.java | 163 ++++++++++ .../inmemcubing/ITInMemCubeBuilderTest.java | 271 +++++++++++++++++ .../impl/threadpool/ITDefaultSchedulerTest.java | 154 ++++++++++ .../kylin/provision/BuildCubeWithEngine.java | 4 +- .../kylin/provision/BuildCubeWithSpark.java | 6 +- .../kylin/provision/BuildCubeWithStream.java | 4 +- .../kylin/provision/BuildIIWithEngine.java | 4 +- .../kylin/provision/BuildIIWithStream.java | 2 +- .../org/apache/kylin/query/KylinTestBase.java | 11 - .../kylin/storage/hbase/ITStorageTest.java | 2 +- source-hive/pom.xml | 7 + .../kylin/source/hive/HiveCmdBuilderTest.java | 6 +- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 111 ++++--- .../storage/hbase/cube/v2/CubeHBaseRPC.java | 56 +--- .../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 94 ++++-- .../hbase/cube/v2/CubeSegmentScanner.java | 215 +------------- .../storage/hbase/cube/v2/CubeStorageQuery.java | 30 +- .../kylin/storage/hbase/cube/v2/RawScan.java | 10 + .../coprocessor/endpoint/CubeVisitService.java | 88 ++++-- .../hbase/steps/SandboxMetastoreCLI.java | 2 +- 47 files changed, 1539 insertions(+), 1292 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/assembly/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java b/assembly/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java index 1267ab7..8494607 100644 --- a/assembly/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java +++ b/assembly/src/test/java/org/apache/kylin/job/DeployLocalMetaToRemoteTest.java @@ -45,7 +45,7 @@ public class DeployLocalMetaToRemoteTest { public static void beforeClass() throws Exception { logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); - System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox"); + System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA); if (StringUtils.isEmpty(System.getProperty("hdp.version"))) { throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2"); } @@ -53,7 +53,7 @@ public class DeployLocalMetaToRemoteTest { @Before public void before() throws Exception { - HBaseMetadataTestCase.staticCreateTestMetadata(AbstractKylinTestCase.SANDBOX_TEST_DATA); + HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA); DeployUtil.initCliWorkDir(); DeployUtil.deployMetadata(); http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java index 2f51475..513f546 100644 --- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java +++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java @@ -34,7 +34,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.ResourceTool; -import org.apache.kylin.common.util.AbstractKylinTestCase; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeUpdate; @@ -66,7 +66,7 @@ public class DeployUtil { public static void deployMetadata() throws IOException { // install metadata to hbase ResourceTool.reset(config()); - ResourceTool.copy(KylinConfig.createInstanceFromUri(AbstractKylinTestCase.LOCALMETA_TEST_DATA), config()); + ResourceTool.copy(KylinConfig.createInstanceFromUri(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA), config()); // update cube desc signature. for (CubeInstance cube : CubeManager.getInstance(config()).listAllCubes()) { http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java b/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java index 5e35257..d850f8e 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/ByteArray.java @@ -100,7 +100,12 @@ public class ByteArray implements Comparable<ByteArray>, Serializable { } public ByteArray copy() { - ByteArray copy = new ByteArray(length); + ByteArray copy; + if (data != null) { + copy = new ByteArray(length); + } else { + copy = new ByteArray(null); + } copy.copyFrom(this); return copy; } @@ -116,7 +121,9 @@ public class ByteArray implements Comparable<ByteArray>, Serializable { } public void copyFrom(ByteArray other) { - System.arraycopy(other.array(), other.offset, data, offset, other.length); + if (other.data != null) { + System.arraycopy(other.array(), other.offset, data, offset, other.length); + } this.length = other.length; } http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-common/src/main/java/org/apache/kylin/common/util/Log4jConfigurer.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/Log4jConfigurer.java b/core-common/src/main/java/org/apache/kylin/common/util/Log4jConfigurer.java index fe0c55a..696aaff 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/Log4jConfigurer.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/Log4jConfigurer.java @@ -21,6 +21,7 @@ package org.apache.kylin.common.util; import java.util.Enumeration; import org.apache.log4j.ConsoleAppender; +import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.log4j.PatternLayout; @@ -35,6 +36,7 @@ public class Log4jConfigurer { public static void initLogger() { if (!INITIALIZED && !isConfigured()) { org.apache.log4j.BasicConfigurator.configure(new ConsoleAppender(new PatternLayout(DEFAULT_PATTERN_LAYOUT))); + LogManager.getRootLogger().setLevel(Level.DEBUG); } INITIALIZED = true; } http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java ---------------------------------------------------------------------- diff --git a/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java b/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java index d517930..3a4b7bb 100644 --- a/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java +++ b/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java @@ -28,16 +28,14 @@ import org.apache.kylin.common.KylinConfig; */ public abstract class AbstractKylinTestCase { - public static final String LOCALMETA_TEST_DATA = "../examples/test_case_data/localmeta"; - public static final String SANDBOX_TEST_DATA = "../examples/test_case_data/sandbox"; - - public static final String[] SERVICES_WITH_CACHE = {// - "org.apache.kylin.cube.CubeManager",// + public static final String[] SERVICES_WITH_CACHE = { // + "org.apache.kylin.cube.CubeManager", // "org.apache.kylin.cube.CubeDescManager", // - "org.apache.kylin.invertedindex.IIDescManager",// - "org.apache.kylin.invertedindex.IIManager",// - "org.apache.kylin.storage.hybrid.HybridManager", "org.apache.kylin.metadata.realization.RealizationRegistry", // + "org.apache.kylin.invertedindex.IIDescManager", // + "org.apache.kylin.invertedindex.IIManager", // + "org.apache.kylin.storage.hybrid.HybridManager", // + "org.apache.kylin.metadata.realization.RealizationRegistry", // "org.apache.kylin.metadata.project.ProjectManager", // "org.apache.kylin.metadata.MetadataManager" // }; http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java ---------------------------------------------------------------------- diff --git a/core-common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java b/core-common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java index e0be1c2..fdf066b 100644 --- a/core-common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java +++ b/core-common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java @@ -26,9 +26,11 @@ import org.apache.kylin.common.KylinConfig; */ public class HBaseMetadataTestCase extends AbstractKylinTestCase { + public static String SANDBOX_TEST_DATA = "../examples/test_case_data/sandbox"; + static { try { - ClassUtil.addClasspath(new File("../examples/test_case_data/sandbox/").getAbsolutePath()); + ClassUtil.addClasspath(new File(SANDBOX_TEST_DATA).getAbsolutePath()); } catch (Exception e) { e.printStackTrace(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java ---------------------------------------------------------------------- diff --git a/core-common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java b/core-common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java index 70c849c..e2f2d8b 100644 --- a/core-common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java +++ b/core-common/src/test/java/org/apache/kylin/common/util/LocalFileMetadataTestCase.java @@ -27,9 +27,15 @@ import org.apache.kylin.common.persistence.ResourceStore; public class LocalFileMetadataTestCase extends AbstractKylinTestCase { + public static String LOCALMETA_TEST_DATA = "../examples/test_case_data/localmeta"; + @Override public void createTestMetadata() { - staticCreateTestMetadata(); + staticCreateTestMetadata(getLocalMetaTestData()); + } + + protected String getLocalMetaTestData() { + return LOCALMETA_TEST_DATA; } public static void staticCreateTestMetadata() { http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java index 7f8d2b8..e8a75d7 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang.StringUtils; @@ -33,6 +34,7 @@ import org.apache.kylin.cube.model.AggregationGroup; import org.apache.kylin.cube.model.AggregationGroup.HierarchyMask; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.RowKeyColDesc; +import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.TblColRef; import com.google.common.base.Function; @@ -52,6 +54,20 @@ public class Cuboid implements Comparable<Cuboid> { } }; + public static Cuboid identifyCuboid(CubeDesc cubeDesc, Set<TblColRef> dimensions, Collection<FunctionDesc> metrics) { + for (FunctionDesc metric : metrics) { + if (metric.getMeasureType().onlyAggrInBaseCuboid()) + return Cuboid.getBaseCuboid(cubeDesc); + } + + long cuboidID = 0; + for (TblColRef column : dimensions) { + int index = cubeDesc.getRowkey().getColumnBitIndex(column); + cuboidID |= 1L << index; + } + return Cuboid.findById(cubeDesc, cuboidID); + } + public static Cuboid findById(CubeDesc cube, byte[] cuboidID) { return findById(cube, Bytes.toLong(cuboidID)); } @@ -397,6 +413,8 @@ public class Cuboid implements Comparable<Cuboid> { return cuboidToGridTableMapping; } + + public static String getDisplayName(long cuboidID, int dimensionCount) { StringBuilder sb = new StringBuilder(); for (int i = 0; i < dimensionCount; ++i) { http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java index c93f65e..9c3d037 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/LazyRowKeyEncoder.java @@ -18,15 +18,9 @@ package org.apache.kylin.cube.kv; -import com.google.common.collect.Lists; -import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.common.util.ShardingHash; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; -import java.util.Arrays; -import java.util.List; - /** * A LazyRowKeyEncoder will not try to calculate shard * It works for both enableSharding or non-enableSharding scenario @@ -45,23 +39,4 @@ public class LazyRowKeyEncoder extends RowKeyEncoder { } } - - //for non-sharding cases it will only return one byte[] with not shard at beginning - public List<byte[]> getRowKeysDifferentShards(byte[] halfCookedKey) { - final short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId()); - - if (!enableSharding) { - return Lists.newArrayList(halfCookedKey);//not shard to append at head, so it is already well cooked - } else { - List<byte[]> ret = Lists.newArrayList(); - for (short i = 0; i < cuboidShardNum; ++i) { - short shard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), i, cubeSeg.getTotalShards()); - byte[] cookedKey = Arrays.copyOf(halfCookedKey, halfCookedKey.length); - BytesUtil.writeShort(shard, cookedKey, 0, RowConstants.ROWKEY_SHARDID_LEN); - ret.add(cookedKey); - } - return ret; - } - - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index 240cf52..65ba0a5 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -900,6 +900,11 @@ public class CubeDesc extends RootPersistentEntity { } } + public TblColRef getColumnByBitIndex(int bitIndex) { + RowKeyColDesc[] rowKeyColumns = this.getRowkey().getRowKeyColumns(); + return rowKeyColumns[rowKeyColumns.length - 1 - bitIndex].getColRef(); + } + public boolean hasMemoryHungryMeasures() { for (MeasureDesc measure : measures) { if (measure.getFunction().getMeasureType().isMemoryHungry()) { http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java index d09ab9a..bccd0c5 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java @@ -47,7 +47,6 @@ public class GTRecord implements Comparable<GTRecord> { } this.info = info; this.maskForEqualHashComp = maskForEqualHashComp; - } public GTRecord(GTInfo info) { @@ -58,6 +57,15 @@ public class GTRecord implements Comparable<GTRecord> { this(info, info.colAll, cols); } + public GTRecord(GTRecord other) { + this.info = other.info; + this.maskForEqualHashComp = other.maskForEqualHashComp; + this.cols = new ByteArray[info.getColumnCount()]; + for (int i = 0; i < other.cols.length; i++) { + this.cols[i] = other.cols[i].copy(); + } + } + public GTInfo getInfo() { return info; } http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java index 56f55f6..e1b38dc 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java @@ -29,8 +29,6 @@ public class GTScanRange { final public GTRecord pkEnd; // inclusive, record must not be null, col[pk].array() can be null to mean unbounded final public List<GTRecord> fuzzyKeys; // partial matching primary keys - - public GTScanRange(GTRecord pkStart, GTRecord pkEnd) { this(pkStart, pkEnd, null); } http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java index 2307aaf..3f9bac0 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java @@ -18,69 +18,166 @@ package org.apache.kylin.gridtable; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.common.FuzzyValueCombination; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.gridtable.CubeGridTable; +import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.filter.CompareTupleFilter; import org.apache.kylin.metadata.filter.ConstantTupleFilter; import org.apache.kylin.metadata.filter.LogicalTupleFilter; import org.apache.kylin.metadata.filter.TupleFilter; import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum; +import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; public class GTScanRangePlanner { private static final Logger logger = LoggerFactory.getLogger(GTScanRangePlanner.class); + private static final int MAX_SCAN_RANGES = 200; private static final int MAX_HBASE_FUZZY_KEYS = 100; - final private GTInfo info; - final private Pair<ByteArray, ByteArray> segmentStartAndEnd; - final private TblColRef partitionColRef; + protected int maxScanRanges; + + //non-GT + protected CubeSegment cubeSegment; + protected CubeDesc cubeDesc; + protected Cuboid cuboid; + protected TupleFilter filter; + protected Set<TblColRef> dimensions; + protected Set<TblColRef> groupbyDims; + protected Set<TblColRef> filterDims; + protected Collection<FunctionDesc> metrics; + + //GT + protected TupleFilter gtFilter; + protected GTInfo gtInfo; + protected Pair<ByteArray, ByteArray> gtStartAndEnd; + protected TblColRef gtPartitionCol; + protected ImmutableBitSet gtDimensions; + protected ImmutableBitSet gtAggrGroups; + protected ImmutableBitSet gtAggrMetrics; + protected String[] gtAggrFuncs; + final protected RecordComparator rangeStartComparator; + final protected RecordComparator rangeEndComparator; + final protected RecordComparator rangeStartEndComparator; + + public GTScanRangePlanner(CubeSegment cubeSegment, Cuboid cuboid, TupleFilter filter, Set<TblColRef> dimensions, Set<TblColRef> groupbyDims, // + Collection<FunctionDesc> metrics) { + + this.maxScanRanges = MAX_SCAN_RANGES; + + this.cubeSegment = cubeSegment; + this.cubeDesc = cubeSegment.getCubeDesc(); + this.cuboid = cuboid; + this.dimensions = dimensions; + this.groupbyDims = groupbyDims; + this.filter = filter; + this.metrics = metrics; + this.filterDims = Sets.newHashSet(); + TupleFilter.collectColumns(filter, this.filterDims); + + this.gtInfo = CubeGridTable.newGTInfo(cubeSegment, cuboid.getId()); + CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping(); + + IGTComparator comp = gtInfo.codeSystem.getComparator(); + //start key GTRecord compare to start key GTRecord + this.rangeStartComparator = getRangeStartComparator(comp); + //stop key GTRecord compare to stop key GTRecord + this.rangeEndComparator = getRangeEndComparator(comp); + //start key GTRecord compare to stop key GTRecord + this.rangeStartEndComparator = getRangeStartEndComparator(comp); + + //replace the constant values in filter to dictionary codes + this.gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, gtInfo, mapping.getCuboidDimensionsInGTOrder(), this.groupbyDims); - final private RecordComparator rangeStartComparator; - final private RecordComparator rangeEndComparator; - final private RecordComparator rangeStartEndComparator; + this.gtDimensions = makeGridTableColumns(mapping, dimensions); + this.gtAggrGroups = makeGridTableColumns(mapping, replaceDerivedColumns(groupbyDims, cubeSegment.getCubeDesc())); + this.gtAggrMetrics = makeGridTableColumns(mapping, metrics); + this.gtAggrFuncs = makeAggrFuncs(mapping, metrics); + + if (cubeSegment.getCubeDesc().getModel().getPartitionDesc().isPartitioned()) { + int index = mapping.getIndexOf(cubeSegment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef()); + if (index >= 0) { + this.gtStartAndEnd = getSegmentStartAndEnd(index); + this.gtPartitionCol = gtInfo.colRef(index); + } + } + + } /** + * constrcut GTScanRangePlanner with incomplete information. only be used for UT * @param info - * @param segmentStartAndEnd in GT encoding - * @param partitionColRef the TblColRef in GT + * @param gtStartAndEnd + * @param gtPartitionCol + * @param gtFilter */ - public GTScanRangePlanner(GTInfo info, Pair<ByteArray, ByteArray> segmentStartAndEnd, TblColRef partitionColRef) { - - this.info = info; - this.segmentStartAndEnd = segmentStartAndEnd; - this.partitionColRef = partitionColRef; + public GTScanRangePlanner(GTInfo info, Pair<ByteArray, ByteArray> gtStartAndEnd, TblColRef gtPartitionCol, TupleFilter gtFilter) { - IGTComparator comp = info.codeSystem.getComparator(); + this.maxScanRanges = MAX_SCAN_RANGES; + this.gtInfo = info; + IGTComparator comp = gtInfo.codeSystem.getComparator(); //start key GTRecord compare to start key GTRecord this.rangeStartComparator = getRangeStartComparator(comp); //stop key GTRecord compare to stop key GTRecord this.rangeEndComparator = getRangeEndComparator(comp); //start key GTRecord compare to stop key GTRecord this.rangeStartEndComparator = getRangeStartEndComparator(comp); - } - // return empty list meaning filter is always false - public List<GTScanRange> planScanRanges(TupleFilter filter) { - return planScanRanges(filter, Integer.MAX_VALUE); + + this.gtFilter = gtFilter; + this.gtStartAndEnd = gtStartAndEnd; + this.gtPartitionCol = gtPartitionCol; } + - // return empty list meaning filter is always false - public List<GTScanRange> planScanRanges(TupleFilter filter, int maxRanges) { + public GTScanRequest planScanRequest(boolean allowPreAggregate) { + GTScanRequest scanRequest; + List<GTScanRange> scanRanges = this.planScanRanges(); + if (scanRanges != null && scanRanges.size() != 0) { + scanRequest = new GTScanRequest(gtInfo, scanRanges, gtDimensions, gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter, allowPreAggregate, cubeSegment.getCubeInstance().getConfig().getQueryCoprocessorMemGB()); + } else { + scanRequest = null; + } + return scanRequest; + } - TupleFilter flatFilter = flattenToOrAndFilter(filter); + /** + * Overwrite this method to provide smarter storage visit plans + * @return + */ + public List<GTScanRange> planScanRanges() { + TupleFilter flatFilter = flattenToOrAndFilter(gtFilter); List<Collection<ColumnRange>> orAndDimRanges = translateToOrAndDimRanges(flatFilter); @@ -92,11 +189,108 @@ public class GTScanRangePlanner { } List<GTScanRange> mergedRanges = mergeOverlapRanges(scanRanges); - mergedRanges = mergeTooManyRanges(mergedRanges, maxRanges); + mergedRanges = mergeTooManyRanges(mergedRanges, maxScanRanges); return mergedRanges; } + private Pair<ByteArray, ByteArray> getSegmentStartAndEnd(int index) { + ByteArray start; + if (cubeSegment.getDateRangeStart() != Long.MIN_VALUE) { + start = encodeTime(cubeSegment.getDateRangeStart(), index, 1); + } else { + start = new ByteArray(); + } + + ByteArray end; + if (cubeSegment.getDateRangeEnd() != Long.MAX_VALUE) { + end = encodeTime(cubeSegment.getDateRangeEnd(), index, -1); + } else { + end = new ByteArray(); + } + return Pair.newPair(start, end); + + } + + private ByteArray encodeTime(long ts, int index, int roundingFlag) { + String value; + DataType partitionColType = gtInfo.getColumnType(index); + if (partitionColType.isDate()) { + value = DateFormat.formatToDateStr(ts); + } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) { + value = DateFormat.formatToTimeWithoutMilliStr(ts); + } else if (partitionColType.isStringFamily()) { + String partitionDateFormat = cubeSegment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat(); + if (StringUtils.isEmpty(partitionDateFormat)) + partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN; + value = DateFormat.formatToDateStr(ts, partitionDateFormat); + } else { + throw new RuntimeException("Type " + partitionColType + " is not valid partition column type"); + } + + ByteBuffer buffer = ByteBuffer.allocate(gtInfo.getMaxColumnLength()); + gtInfo.getCodeSystem().encodeColumnValue(index, value, roundingFlag, buffer); + + return ByteArray.copyOf(buffer.array(), 0, buffer.position()); + } + + private Set<TblColRef> replaceDerivedColumns(Set<TblColRef> input, CubeDesc cubeDesc) { + Set<TblColRef> ret = Sets.newHashSet(); + for (TblColRef col : input) { + if (cubeDesc.hasHostColumn(col)) { + for (TblColRef host : cubeDesc.getHostInfo(col).columns) { + ret.add(host); + } + } else { + ret.add(col); + } + } + return ret; + } + + private ImmutableBitSet makeGridTableColumns(CuboidToGridTableMapping mapping, Set<TblColRef> dimensions) { + BitSet result = new BitSet(); + for (TblColRef dim : dimensions) { + int idx = mapping.getIndexOf(dim); + if (idx >= 0) + result.set(idx); + } + return new ImmutableBitSet(result); + } + + private ImmutableBitSet makeGridTableColumns(CuboidToGridTableMapping mapping, Collection<FunctionDesc> metrics) { + BitSet result = new BitSet(); + for (FunctionDesc metric : metrics) { + int idx = mapping.getIndexOf(metric); + if (idx < 0) + throw new IllegalStateException(metric + " not found in " + mapping); + result.set(idx); + } + return new ImmutableBitSet(result); + } + + private String[] makeAggrFuncs(final CuboidToGridTableMapping mapping, Collection<FunctionDesc> metrics) { + + //metrics are represented in ImmutableBitSet, which loses order information + //sort the aggrFuns to align with metrics natural order + List<FunctionDesc> metricList = Lists.newArrayList(metrics); + Collections.sort(metricList, new Comparator<FunctionDesc>() { + @Override + public int compare(FunctionDesc o1, FunctionDesc o2) { + int a = mapping.getIndexOf(o1); + int b = mapping.getIndexOf(o2); + return a - b; + } + }); + + String[] result = new String[metricList.size()]; + int i = 0; + for (FunctionDesc metric : metricList) { + result[i++] = metric.getExpression(); + } + return result; + } + private String makeReadable(ByteArray byteArray) { if (byteArray == null) { return null; @@ -105,29 +299,29 @@ public class GTScanRangePlanner { } } - private GTScanRange newScanRange(Collection<ColumnRange> andDimRanges) { - GTRecord pkStart = new GTRecord(info); - GTRecord pkEnd = new GTRecord(info); + protected GTScanRange newScanRange(Collection<ColumnRange> andDimRanges) { + GTRecord pkStart = new GTRecord(gtInfo); + GTRecord pkEnd = new GTRecord(gtInfo); Map<Integer, Set<ByteArray>> fuzzyValues = Maps.newHashMap(); List<GTRecord> fuzzyKeys; for (ColumnRange range : andDimRanges) { - if (partitionColRef != null && range.column.equals(partitionColRef)) { - if (rangeStartEndComparator.comparator.compare(segmentStartAndEnd.getFirst(), range.end) <= 0 // - && (rangeStartEndComparator.comparator.compare(range.begin, segmentStartAndEnd.getSecond()) < 0 // - || rangeStartEndComparator.comparator.compare(range.begin, segmentStartAndEnd.getSecond()) == 0 // - && (range.op == FilterOperatorEnum.EQ || range.op == FilterOperatorEnum.LTE || range.op == FilterOperatorEnum.GTE || range.op == FilterOperatorEnum.IN))) { + if (gtPartitionCol != null && range.column.equals(gtPartitionCol)) { + if (rangeStartEndComparator.comparator.compare(gtStartAndEnd.getFirst(), range.end) <= 0 // + && (rangeStartEndComparator.comparator.compare(range.begin, gtStartAndEnd.getSecond()) < 0 // + || rangeStartEndComparator.comparator.compare(range.begin, gtStartAndEnd.getSecond()) == 0 // + && (range.op == FilterOperatorEnum.EQ || range.op == FilterOperatorEnum.LTE || range.op == FilterOperatorEnum.GTE || range.op == FilterOperatorEnum.IN))) { //segment range is [Closed,Open), but segmentStartAndEnd.getSecond() might be rounded, so use <= when has equals in condition. } else { - logger.debug("Pre-check partition col filter failed, partitionColRef {}, segment start {}, segment end {}, range begin {}, range end {}",// - new Object[] { partitionColRef, makeReadable(segmentStartAndEnd.getFirst()), makeReadable(segmentStartAndEnd.getSecond()), makeReadable(range.begin), makeReadable(range.end) }); + logger.debug("Pre-check partition col filter failed, partitionColRef {}, segment start {}, segment end {}, range begin {}, range end {}", // + new Object[] { gtPartitionCol, makeReadable(gtStartAndEnd.getFirst()), makeReadable(gtStartAndEnd.getSecond()), makeReadable(range.begin), makeReadable(range.end) }); return null; } } int col = range.column.getColumnDesc().getZeroBasedIndex(); - if (!info.primaryKey.get(col)) + if (!gtInfo.primaryKey.get(col)) continue; pkStart.set(col, range.begin); @@ -139,7 +333,6 @@ public class GTScanRangePlanner { } fuzzyKeys = buildFuzzyKeys(fuzzyValues); - return new GTScanRange(pkStart, pkEnd, fuzzyKeys); } @@ -154,17 +347,16 @@ public class GTScanRangePlanner { logger.info("The execution of this query will not use fuzzy key"); return result; } - List<Map<Integer, ByteArray>> fuzzyValueCombinations = FuzzyValueCombination.calculate(fuzzyValueSet, MAX_HBASE_FUZZY_KEYS); for (Map<Integer, ByteArray> fuzzyValue : fuzzyValueCombinations) { - BitSet bitSet = new BitSet(info.getColumnCount()); + BitSet bitSet = new BitSet(gtInfo.getColumnCount()); for (Map.Entry<Integer, ByteArray> entry : fuzzyValue.entrySet()) { bitSet.set(entry.getKey()); } - GTRecord fuzzy = new GTRecord(info, new ImmutableBitSet(bitSet)); + GTRecord fuzzy = new GTRecord(gtInfo, new ImmutableBitSet(bitSet)); for (Map.Entry<Integer, ByteArray> entry : fuzzyValue.entrySet()) { fuzzy.set(entry.getKey(), entry.getValue()); } @@ -174,7 +366,7 @@ public class GTScanRangePlanner { return result; } - private TupleFilter flattenToOrAndFilter(TupleFilter filter) { + protected TupleFilter flattenToOrAndFilter(TupleFilter filter) { if (filter == null) return null; @@ -193,7 +385,7 @@ public class GTScanRangePlanner { return flatFilter; } - private List<Collection<ColumnRange>> translateToOrAndDimRanges(TupleFilter flatFilter) { + protected List<Collection<ColumnRange>> translateToOrAndDimRanges(TupleFilter flatFilter) { List<Collection<ColumnRange>> result = Lists.newArrayList(); if (flatFilter == null) { @@ -272,7 +464,7 @@ public class GTScanRangePlanner { return orAndRanges; } - private List<GTScanRange> mergeOverlapRanges(List<GTScanRange> ranges) { + protected List<GTScanRange> mergeOverlapRanges(List<GTScanRange> ranges) { if (ranges.size() <= 1) { return ranges; } @@ -339,7 +531,7 @@ public class GTScanRangePlanner { return new GTScanRange(start, end, newFuzzyKeys); } - private List<GTScanRange> mergeTooManyRanges(List<GTScanRange> ranges, int maxRanges) { + protected List<GTScanRange> mergeTooManyRanges(List<GTScanRange> ranges, int maxRanges) { if (ranges.size() <= maxRanges) { return ranges; } @@ -351,7 +543,15 @@ public class GTScanRangePlanner { return result; } - private class ColumnRange { + public int getMaxScanRanges() { + return maxScanRanges; + } + + public void setMaxScanRanges(int maxScanRanges) { + this.maxScanRanges = maxScanRanges; + } + + protected class ColumnRange { private TblColRef column; private ByteArray begin = ByteArray.EMPTY; private ByteArray end = ByteArray.EMPTY; @@ -412,7 +612,7 @@ public class GTScanRangePlanner { if (valueSet != null) { return valueSet.isEmpty(); } else if (begin.array() != null && end.array() != null) { - return info.codeSystem.getComparator().compare(begin, end) > 0; + return gtInfo.codeSystem.getComparator().compare(begin, end) > 0; } else { return false; } http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java index 1edfb36..c4abb57 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java @@ -37,7 +37,7 @@ import com.google.common.collect.Sets; public class GTScanRequest { private GTInfo info; - private GTScanRange range; + private List<GTScanRange> ranges; private ImmutableBitSet columns; private transient ImmutableBitSet selectedColBlocks; @@ -53,18 +53,26 @@ public class GTScanRequest { private boolean allowPreAggregation = true; private double aggrCacheGB = 0; // no limit - public GTScanRequest(GTInfo info, GTScanRange range, ImmutableBitSet columns, TupleFilter filterPushDown) { + public GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet columns, TupleFilter filterPushDown) { this.info = info; - this.range = range == null ? new GTScanRange(new GTRecord(info), new GTRecord(info)) : range; + if (ranges == null) { + this.ranges = Lists.newArrayList(new GTScanRange(new GTRecord(info), new GTRecord(info))); + } else { + this.ranges = ranges; + } this.columns = columns; this.filterPushDown = filterPushDown; validate(info); } - public GTScanRequest(GTInfo info, GTScanRange range, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, // + public GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, // ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown, boolean allowPreAggregation, double aggrCacheGB) { this.info = info; - this.range = range == null ? new GTScanRange(new GTRecord(info), new GTRecord(info)) : range; + if (ranges == null) { + this.ranges = Lists.newArrayList(new GTScanRange(new GTRecord(info), new GTRecord(info))); + } else { + this.ranges = ranges; + } this.columns = dimensions; this.filterPushDown = filterPushDown; @@ -137,7 +145,7 @@ public class GTScanRequest { /** * doFilter,doAggr,doMemCheck are only for profiling use. * in normal cases they are all true. - * + * <p/> * Refer to CoprocessorBehavior for explanation */ public IGTScanner decorateScanner(IGTScanner scanner, boolean doFilter, boolean doAggr) throws IOException { @@ -195,16 +203,12 @@ public class GTScanRequest { return info; } - public GTRecord getPkStart() { - return range.pkStart; + public List<GTScanRange> getGTScanRanges() { + return ranges; } - public GTRecord getPkEnd() { - return range.pkEnd; - } - - public List<GTRecord> getFuzzyKeys() { - return range.fuzzyKeys; + public void setGTScanRanges(List<GTScanRange> ranges) { + this.ranges = ranges; } public ImmutableBitSet getSelectedColBlocks() { @@ -241,7 +245,7 @@ public class GTScanRequest { @Override public String toString() { - return "GTScanRequest [range=" + range + ", columns=" + columns + ", filterPushDown=" + filterPushDown + ", aggrGroupBy=" + aggrGroupBy + ", aggrMetrics=" + aggrMetrics + ", aggrMetricsFuncs=" + Arrays.toString(aggrMetricsFuncs) + "]"; + return "GTScanRequest [range=" + ranges + ", columns=" + columns + ", filterPushDown=" + filterPushDown + ", aggrGroupBy=" + aggrGroupBy + ", aggrMetrics=" + aggrMetrics + ", aggrMetricsFuncs=" + Arrays.toString(aggrMetricsFuncs) + "]"; } public static final BytesSerializer<GTScanRequest> serializer = new BytesSerializer<GTScanRequest>() { @@ -249,11 +253,14 @@ public class GTScanRequest { public void serialize(GTScanRequest value, ByteBuffer out) { GTInfo.serializer.serialize(value.info, out); - serializeGTRecord(value.range.pkStart, out); - serializeGTRecord(value.range.pkEnd, out); - BytesUtil.writeVInt(value.range.fuzzyKeys.size(), out); - for (GTRecord f : value.range.fuzzyKeys) { - serializeGTRecord(f, out); + BytesUtil.writeVInt(value.ranges.size(), out); + for (GTScanRange range : value.ranges) { + serializeGTRecord(range.pkStart, out); + serializeGTRecord(range.pkEnd, out); + BytesUtil.writeVInt(range.fuzzyKeys.size(), out); + for (GTRecord f : range.fuzzyKeys) { + serializeGTRecord(f, out); + } } ImmutableBitSet.serializer.serialize(value.columns, out); @@ -270,14 +277,19 @@ public class GTScanRequest { public GTScanRequest deserialize(ByteBuffer in) { GTInfo sInfo = GTInfo.serializer.deserialize(in); - GTRecord sPkStart = deserializeGTRecord(in, sInfo); - GTRecord sPkEnd = deserializeGTRecord(in, sInfo); - List<GTRecord> sFuzzyKeys = Lists.newArrayList(); - int sFuzzyKeySize = BytesUtil.readVInt(in); - for (int i = 0; i < sFuzzyKeySize; i++) { - sFuzzyKeys.add(deserializeGTRecord(in, sInfo)); + List<GTScanRange> sRanges = Lists.newArrayList(); + int sRangesCount = BytesUtil.readVInt(in); + for (int rangeIdx = 0; rangeIdx < sRangesCount; rangeIdx++) { + GTRecord sPkStart = deserializeGTRecord(in, sInfo); + GTRecord sPkEnd = deserializeGTRecord(in, sInfo); + List<GTRecord> sFuzzyKeys = Lists.newArrayList(); + int sFuzzyKeySize = BytesUtil.readVInt(in); + for (int i = 0; i < sFuzzyKeySize; i++) { + sFuzzyKeys.add(deserializeGTRecord(in, sInfo)); + } + GTScanRange sRange = new GTScanRange(sPkStart, sPkEnd, sFuzzyKeys); + sRanges.add(sRange); } - GTScanRange sRange = new GTScanRange(sPkStart, sPkEnd, sFuzzyKeys); ImmutableBitSet sColumns = ImmutableBitSet.serializer.deserialize(in); TupleFilter sGTFilter = GTUtil.deserializeGTFilter(BytesUtil.readByteArray(in), sInfo); @@ -288,7 +300,7 @@ public class GTScanRequest { boolean sAllowPreAggr = (BytesUtil.readVInt(in) == 1); double sAggrCacheGB = in.getDouble(); - return new GTScanRequest(sInfo, sRange, sColumns, sAggGroupBy, sAggrMetrics, sAggrMetricFuncs, sGTFilter, sAllowPreAggr, sAggrCacheGB); + return new GTScanRequest(sInfo, sRanges, sColumns, sAggGroupBy, sAggrMetrics, sAggrMetricFuncs, sGTFilter, sAllowPreAggr, sAggrCacheGB); } private void serializeGTRecord(GTRecord gtRecord, ByteBuffer out) { http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStorage.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStorage.java b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStorage.java new file mode 100644 index 0000000..ff95743 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStorage.java @@ -0,0 +1,27 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * / + */ + +package org.apache.kylin.gridtable; + +import java.io.IOException; + +public interface IGTStorage { + IGTScanner getGTScanner(final GTScanRequest scanRequests) throws IOException; +} http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java b/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java new file mode 100644 index 0000000..75fa94b --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java @@ -0,0 +1,72 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * / + */ + +package org.apache.kylin.gridtable; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.util.Iterator; + +import org.apache.kylin.common.debug.BackdoorToggles; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ScannerWorker { + + private static final Logger logger = LoggerFactory.getLogger(ScannerWorker.class); + private IGTScanner internal = null; + + public ScannerWorker(CubeSegment cubeSeg, Cuboid cuboid, GTScanRequest scanRequest) { + if (scanRequest == null) { + logger.info("Segment {} will be skipped", cubeSeg); + internal = new EmptyGTScanner(0); + return; + } + + final GTInfo info = scanRequest.getInfo(); + + try { + IGTStorage rpc; + if ("scan".equalsIgnoreCase(BackdoorToggles.getHbaseCubeQueryProtocol())) { + rpc = (IGTStorage) Class.forName("org.apache.kylin.storage.hbase.cube.v2.CubeHBaseScanRPC").getConstructor(CubeSegment.class, Cuboid.class, GTInfo.class).newInstance(cubeSeg, cuboid, info); // for local debug + } else { + rpc = (IGTStorage) Class.forName("org.apache.kylin.storage.hbase.cube.v2.CubeHBaseEndpointRPC").getConstructor(CubeSegment.class, Cuboid.class, GTInfo.class).newInstance(cubeSeg, cuboid, info); // default behavior + } + internal = rpc.getGTScanner(scanRequest); + } catch (IOException | InstantiationException | InvocationTargetException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException e) { + throw new RuntimeException("error", e); + } + } + + public Iterator<GTRecord> iterator() { + return internal.iterator(); + } + + public void close() throws IOException { + internal.close(); + } + + public int getScannedRowCount() { + return internal.getScannedRowCount(); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/test/java/org/apache/kylin/cube/AggregationGroupRuleTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/AggregationGroupRuleTest.java b/core-cube/src/test/java/org/apache/kylin/cube/AggregationGroupRuleTest.java index 53952c4..6c3d544 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/AggregationGroupRuleTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/AggregationGroupRuleTest.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.util.Arrays; import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.validation.IValidatorRule; import org.apache.kylin.cube.model.validation.ValidateContext; @@ -39,7 +40,7 @@ public class AggregationGroupRuleTest { public void testGoodDesc() throws IOException { AggregationGroupRule rule = getAggregationGroupRule(); - for (File f : new File("../examples/test_case_data/localmeta/cube_desc/").listFiles()) { + for (File f : new File(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/cube_desc/").listFiles()) { CubeDesc desc = JsonUtil.readValue(new FileInputStream(f), CubeDesc.class); ValidateContext vContext = new ValidateContext(); rule.validate(desc, vContext); @@ -57,7 +58,7 @@ public class AggregationGroupRuleTest { } }; - for (File f : new File("../examples/test_case_data/localmeta/cube_desc/").listFiles()) { + for (File f : new File(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/cube_desc/").listFiles()) { System.out.println(f.getName()); CubeDesc desc = JsonUtil.readValue(new FileInputStream(f), CubeDesc.class); ValidateContext vContext = new ValidateContext(); @@ -72,9 +73,9 @@ public class AggregationGroupRuleTest { public void testGoodDesc2() throws IOException { ValidateContext vContext = new ValidateContext(); - CubeDesc desc = JsonUtil.readValue(new FileInputStream("../examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json"), CubeDesc.class); - desc.getAggregationGroups().get(0).getSelectRule().joint_dims = new String[][] {// - new String[] { "lstg_format_name", "lstg_site_id", "slr_segment_cd", "CATEG_LVL2_NAME" } }; + CubeDesc desc = JsonUtil.readValue(new FileInputStream(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/cube_desc/test_kylin_cube_with_slr_desc.json"), CubeDesc.class); + desc.getAggregationGroups().get(0).getSelectRule().joint_dims = new String[][] { // + new String[] { "lstg_format_name", "lstg_site_id", "slr_segment_cd", "CATEG_LVL2_NAME" } }; IValidatorRule<CubeDesc> rule = getAggregationGroupRule(); rule.validate(desc, vContext); @@ -86,7 +87,7 @@ public class AggregationGroupRuleTest { public void testBadDesc1() throws IOException { ValidateContext vContext = new ValidateContext(); - CubeDesc desc = JsonUtil.readValue(new FileInputStream("../examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json"), CubeDesc.class); + CubeDesc desc = JsonUtil.readValue(new FileInputStream(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/cube_desc/test_kylin_cube_with_slr_desc.json"), CubeDesc.class); String[] temp = Arrays.asList(desc.getAggregationGroups().get(0).getIncludes()).subList(0, 3).toArray(new String[3]); desc.getAggregationGroups().get(0).setIncludes(temp); @@ -103,9 +104,9 @@ public class AggregationGroupRuleTest { public void testBadDesc2() throws IOException { ValidateContext vContext = new ValidateContext(); - CubeDesc desc = JsonUtil.readValue(new FileInputStream("../examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json"), CubeDesc.class); - desc.getAggregationGroups().get(0).getSelectRule().joint_dims = new String[][] {// - new String[] { "lstg_format_name", "lstg_site_id", "slr_segment_cd", "META_CATEG_NAME", "CATEG_LVL2_NAME" } }; + CubeDesc desc = JsonUtil.readValue(new FileInputStream(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/cube_desc/test_kylin_cube_with_slr_desc.json"), CubeDesc.class); + desc.getAggregationGroups().get(0).getSelectRule().joint_dims = new String[][] { // + new String[] { "lstg_format_name", "lstg_site_id", "slr_segment_cd", "META_CATEG_NAME", "CATEG_LVL2_NAME" } }; IValidatorRule<CubeDesc> rule = getAggregationGroupRule(); rule.validate(desc, vContext); http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/test/java/org/apache/kylin/cube/RowKeyAttrRuleTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/RowKeyAttrRuleTest.java b/core-cube/src/test/java/org/apache/kylin/cube/RowKeyAttrRuleTest.java index b07d360..636102c 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/RowKeyAttrRuleTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/RowKeyAttrRuleTest.java @@ -25,6 +25,7 @@ import java.io.FileInputStream; import java.io.IOException; import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.validation.IValidatorRule; import org.apache.kylin.cube.model.validation.ValidateContext; @@ -35,7 +36,7 @@ public class RowKeyAttrRuleTest { @Test public void testGoodDesc() throws IOException { - for (File f : new File("../examples/test_case_data/localmeta/cube_desc/").listFiles()) { + for (File f : new File(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/cube_desc/").listFiles()) { CubeDesc desc = JsonUtil.readValue(new FileInputStream(f), CubeDesc.class); ValidateContext vContext = new ValidateContext(); IValidatorRule<CubeDesc> rule = new RowKeyAttrRule(); @@ -48,7 +49,7 @@ public class RowKeyAttrRuleTest { @Test public void testBadDesc() throws IOException { ValidateContext vContext = new ValidateContext(); - CubeDesc desc = JsonUtil.readValue(new FileInputStream("../examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json"), CubeDesc.class); + CubeDesc desc = JsonUtil.readValue(new FileInputStream(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/cube_desc/test_kylin_cube_with_slr_desc.json"), CubeDesc.class); desc.getRowkey().getRowKeyColumns()[2].setColumn(""); IValidatorRule<CubeDesc> rule = new RowKeyAttrRule(); rule.validate(desc, vContext); http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java deleted file mode 100644 index 80e3df1..0000000 --- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.cube.inmemcubing; - -import static org.junit.Assert.assertEquals; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.PrintWriter; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.LocalFileMetadataTestCase; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.dimension.Dictionary; -import org.apache.kylin.gridtable.GTRecord; -import org.apache.kylin.metadata.model.TblColRef; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - */ -public class DoggedCubeBuilderTest extends LocalFileMetadataTestCase { - - @SuppressWarnings("unused") - private static final Logger logger = LoggerFactory.getLogger(DoggedCubeBuilderTest.class); - - private static final int INPUT_ROWS = 10000; - private static final int SPLIT_ROWS = 5000; - private static final int THREADS = 4; - - private static CubeInstance cube; - private static String flatTable; - private static Map<TblColRef, Dictionary<String>> dictionaryMap; - - @BeforeClass - public static void before() throws IOException { - staticCreateTestMetadata(); - - KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - CubeManager cubeManager = CubeManager.getInstance(kylinConfig); - - cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty"); - flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv"; - dictionaryMap = InMemCubeBuilderTest.getDictionaryMap(cube, flatTable); - } - - @AfterClass - public static void after() throws Exception { - staticCleanupTestMetadata(); - } - - @Test - public void test() throws Exception { - - ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000); - ExecutorService executorService = Executors.newSingleThreadExecutor(); - long randSeed = System.currentTimeMillis(); - - DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap); - doggedBuilder.setConcurrentThreads(THREADS); - doggedBuilder.setSplitRowThreshold(SPLIT_ROWS); - FileRecordWriter doggedResult = new FileRecordWriter(); - - { - Future<?> future = executorService.submit(doggedBuilder.buildAsRunnable(queue, doggedResult)); - InMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed); - future.get(); - doggedResult.close(); - } - - InMemCubeBuilder inmemBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap); - inmemBuilder.setConcurrentThreads(THREADS); - FileRecordWriter inmemResult = new FileRecordWriter(); - - { - Future<?> future = executorService.submit(inmemBuilder.buildAsRunnable(queue, inmemResult)); - InMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed); - future.get(); - inmemResult.close(); - } - - fileCompare(doggedResult.file, inmemResult.file); - doggedResult.file.delete(); - inmemResult.file.delete(); - } - - private void fileCompare(File file, File file2) throws IOException { - BufferedReader r1 = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8")); - BufferedReader r2 = new BufferedReader(new InputStreamReader(new FileInputStream(file2), "UTF-8")); - - String line1, line2; - do { - line1 = r1.readLine(); - line2 = r2.readLine(); - - assertEquals(line1, line2); - - } while (line1 != null || line2 != null); - - r1.close(); - r2.close(); - } - - class FileRecordWriter implements ICuboidWriter { - - File file; - PrintWriter writer; - - FileRecordWriter() throws IOException { - file = File.createTempFile("DoggedCubeBuilderTest_", ".data"); - writer = new PrintWriter(file, "UTF-8"); - } - - @Override - public void write(long cuboidId, GTRecord record) throws IOException { - writer.print(cuboidId); - writer.print(", "); - writer.print(record.toString()); - writer.println(); - } - - @Override - public void flush() { - - } - - @Override - public void close() { - writer.close(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java deleted file mode 100644 index 88573c6..0000000 --- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java +++ /dev/null @@ -1,268 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.cube.inmemcubing; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.TreeSet; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -import org.apache.commons.io.FileUtils; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.LocalFileMetadataTestCase; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; -import org.apache.kylin.dict.DictionaryGenerator; -import org.apache.kylin.dict.IterableDictionaryValueEnumerator; -import org.apache.kylin.dimension.Dictionary; -import org.apache.kylin.gridtable.GTRecord; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.metadata.model.TblColRef; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -/** - */ -public class InMemCubeBuilderTest extends LocalFileMetadataTestCase { - - private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderTest.class); - - private CubeInstance cube; - private String flatTable; - private Map<TblColRef, Dictionary<String>> dictionaryMap; - - private int nInpRows; - private int nThreads; - - @Before - public void before() throws IOException { - createTestMetadata(); - } - - @After - public void after() throws Exception { - cleanupTestMetadata(); - } - - @Test - public void testKylinCube() throws Exception { - testBuild("test_kylin_cube_without_slr_left_join_empty", // - "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv", 70000, 4); - } - - @Test - public void testSSBCube() throws Exception { - testBuild("ssb", // - "../examples/test_case_data/localmeta/data/kylin_intermediate_ssb_19920101000000_19920201000000.csv", 1000, 1); - } - - public void testBuild(String cubeName, String flatTable, int nInpRows, int nThreads) throws Exception { - KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - CubeManager cubeManager = CubeManager.getInstance(kylinConfig); - - this.nInpRows = nInpRows; - this.nThreads = nThreads; - - this.cube = cubeManager.getCube(cubeName); - this.flatTable = flatTable; - this.dictionaryMap = getDictionaryMap(cube, flatTable); - - testBuildInner(); - } - - private void testBuildInner() throws Exception { - - InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap); - //DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap); - cubeBuilder.setConcurrentThreads(nThreads); - - ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000); - ExecutorService executorService = Executors.newSingleThreadExecutor(); - - try { - // round 1 - { - Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter())); - feedData(cube, flatTable, queue, nInpRows); - future.get(); - } - - // round 2, zero input - { - Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter())); - feedData(cube, flatTable, queue, 0); - future.get(); - } - - // round 3 - { - Future<?> future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new ConsoleGTRecordWriter())); - feedData(cube, flatTable, queue, nInpRows); - future.get(); - } - - } catch (Exception e) { - logger.error("stream build failed", e); - throw new IOException("Failed to build cube ", e); - } - } - - static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count) throws IOException, InterruptedException { - feedData(cube, flatTable, queue, count, 0); - } - - static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count, long randSeed) throws IOException, InterruptedException { - CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), null); - int nColumns = flatTableDesc.getColumnList().size(); - - @SuppressWarnings("unchecked") - Set<String>[] distinctSets = new Set[nColumns]; - for (int i = 0; i < nColumns; i++) - distinctSets[i] = new TreeSet<String>(); - - // get distinct values on each column - List<String> lines = FileUtils.readLines(new File(flatTable), "UTF-8"); - for (String line : lines) { - String[] row = line.trim().split(","); - assert row.length == nColumns; - for (int i = 0; i < nColumns; i++) - distinctSets[i].add(row[i]); - } - - List<String[]> distincts = new ArrayList<String[]>(); - for (int i = 0; i < nColumns; i++) { - distincts.add((String[]) distinctSets[i].toArray(new String[distinctSets[i].size()])); - } - - Random rand = new Random(); - if (randSeed != 0) - rand.setSeed(randSeed); - - // output with random data - for (; count > 0; count--) { - ArrayList<String> row = new ArrayList<String>(nColumns); - for (int i = 0; i < nColumns; i++) { - String[] candidates = distincts.get(i); - row.add(candidates[rand.nextInt(candidates.length)]); - } - queue.put(row); - } - queue.put(new ArrayList<String>(0)); - } - - static Map<TblColRef, Dictionary<String>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException { - Map<TblColRef, Dictionary<String>> result = Maps.newHashMap(); - CubeDesc desc = cube.getDescriptor(); - CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(desc, null); - int nColumns = flatTableDesc.getColumnList().size(); - - List<TblColRef> columns = Cuboid.getBaseCuboid(desc).getColumns(); - for (int c = 0; c < columns.size(); c++) { - TblColRef col = columns.get(c); - if (desc.getRowkey().isUseDictionary(col)) { - logger.info("Building dictionary for " + col); - List<byte[]> valueList = readValueList(flatTable, nColumns, flatTableDesc.getRowKeyColumnIndexes()[c]); - Dictionary<String> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(col.getType(), new IterableDictionaryValueEnumerator(valueList)); - result.put(col, dict); - } - } - - for (int measureIdx = 0; measureIdx < cube.getDescriptor().getMeasures().size(); measureIdx++) { - MeasureDesc measureDesc = cube.getDescriptor().getMeasures().get(measureIdx); - FunctionDesc func = measureDesc.getFunction(); - List<TblColRef> dictCols = func.getMeasureType().getColumnsNeedDictionary(func); - if (dictCols.isEmpty()) - continue; - - int[] flatTableIdx = flatTableDesc.getMeasureColumnIndexes()[measureIdx]; - List<TblColRef> paramCols = func.getParameter().getColRefs(); - for (int i = 0; i < paramCols.size(); i++) { - TblColRef col = paramCols.get(i); - if (dictCols.contains(col)) { - int colIdxOnFlat = flatTableIdx[i]; - logger.info("Building dictionary for " + col); - List<byte[]> valueList = readValueList(flatTable, nColumns, colIdxOnFlat); - Dictionary<String> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(col.getType(), new IterableDictionaryValueEnumerator(valueList)); - - result.put(col, dict); - } - } - } - - return result; - } - - private static List<byte[]> readValueList(String flatTable, int nColumns, int c) throws IOException { - List<byte[]> result = Lists.newArrayList(); - List<String> lines = FileUtils.readLines(new File(flatTable), "UTF-8"); - for (String line : lines) { - String[] row = line.trim().split(","); - if (row.length != nColumns) { - throw new IllegalStateException(); - } - if (row[c] != null) { - result.add(Bytes.toBytes(row[c])); - } - } - return result; - } - - class ConsoleGTRecordWriter implements ICuboidWriter { - - boolean verbose = false; - - @Override - public void write(long cuboidId, GTRecord record) throws IOException { - if (verbose) - System.out.println(record.toString()); - } - - @Override - public void flush() { - if (verbose) { - System.out.println("flush"); - } - } - - @Override - public void close() { - if (verbose) { - System.out.println("close"); - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/b26b2489/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java index b1b5ee9..b411e96 100644 --- a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java +++ b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java @@ -101,11 +101,11 @@ public class DictGridTableTest { ByteArray segmentEnd = enc(info, 0, "2015-01-15"); assertEquals(segmentStart, segmentStartX); - GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0)); { LogicalTupleFilter filter = and(timeComp0, ageComp1); - List<GTScanRange> r = planner.planScanRanges(filter); + GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0),filter); + List<GTScanRange> r = planner.planScanRanges(); assertEquals(1, r.size());//scan range are [close,close] assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString()); assertEquals(1, r.get(0).fuzzyKeys.size()); @@ -113,29 +113,34 @@ public class DictGridTableTest { } { LogicalTupleFilter filter = and(timeComp2, ageComp1); - List<GTScanRange> r = planner.planScanRanges(filter); + GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0),filter); + List<GTScanRange> r = planner.planScanRanges(); assertEquals(0, r.size()); } { LogicalTupleFilter filter = and(timeComp4, ageComp1); - List<GTScanRange> r = planner.planScanRanges(filter); + GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0),filter); + List<GTScanRange> r = planner.planScanRanges(); assertEquals(1, r.size()); } { LogicalTupleFilter filter = and(timeComp5, ageComp1); - List<GTScanRange> r = planner.planScanRanges(filter); + GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0),filter); + List<GTScanRange> r = planner.planScanRanges(); assertEquals(0, r.size()); } { LogicalTupleFilter filter = or(and(timeComp2, ageComp1), and(timeComp1, ageComp1), and(timeComp6, ageComp1)); - List<GTScanRange> r = planner.planScanRanges(filter); + GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0),filter); + List<GTScanRange> r = planner.planScanRanges(); assertEquals(1, r.size()); assertEquals("[1421193600000, 10]-[null, 10]", r.get(0).toString()); assertEquals("[[10], [1421193600000, 10]]", r.get(0).fuzzyKeys.toString()); } { LogicalTupleFilter filter = or(timeComp2, timeComp1, timeComp6); - List<GTScanRange> r = planner.planScanRanges(filter); + GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0),filter); + List<GTScanRange> r = planner.planScanRanges(); assertEquals(1, r.size()); assertEquals("[1421193600000, null]-[null, null]", r.get(0).toString()); assertEquals(0, r.get(0).fuzzyKeys.size()); @@ -143,20 +148,23 @@ public class DictGridTableTest { { //skip FALSE filter LogicalTupleFilter filter = and(ageComp1, ConstantTupleFilter.FALSE); - List<GTScanRange> r = planner.planScanRanges(filter); + GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0),filter); + List<GTScanRange> r = planner.planScanRanges(); assertEquals(0, r.size()); } { //TRUE or FALSE filter LogicalTupleFilter filter = or(ConstantTupleFilter.TRUE, ConstantTupleFilter.FALSE); - List<GTScanRange> r = planner.planScanRanges(filter); + GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0),filter); + List<GTScanRange> r = planner.planScanRanges(); assertEquals(1, r.size()); assertEquals("[null, null]-[null, null]", r.get(0).toString()); } { //TRUE or other filter LogicalTupleFilter filter = or(ageComp1, ConstantTupleFilter.TRUE); - List<GTScanRange> r = planner.planScanRanges(filter); + GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), info.colRef(0),filter); + List<GTScanRange> r = planner.planScanRanges(); assertEquals(1, r.size()); assertEquals("[null, null]-[null, null]", r.get(0).toString()); } @@ -165,11 +173,11 @@ public class DictGridTableTest { @Test public void verifySegmentSkipping2() { ByteArray segmentEnd = enc(info, 0, "2015-01-15"); - GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd), info.colRef(0)); { LogicalTupleFilter filter = and(timeComp0, ageComp1); - List<GTScanRange> r = planner.planScanRanges(filter); + GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd), info.colRef(0),filter); + List<GTScanRange> r = planner.planScanRanges(); assertEquals(1, r.size());//scan range are [close,close] assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString()); assertEquals(1, r.get(0).fuzzyKeys.size()); @@ -178,7 +186,8 @@ public class DictGridTableTest { { LogicalTupleFilter filter = and(timeComp5, ageComp1); - List<GTScanRange> r = planner.planScanRanges(filter); + GTScanRangePlanner planner = new GTScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd), info.colRef(0),filter); + List<GTScanRange> r = planner.planScanRanges(); assertEquals(0, r.size());//scan range are [close,close] } } @@ -186,12 +195,12 @@ public class DictGridTableTest { @Test public void verifyScanRangePlanner() { - GTScanRangePlanner planner = new GTScanRangePlanner(info, null, null); // flatten or-and & hbase fuzzy value { LogicalTupleFilter filter = and(timeComp1, or(ageComp1, ageComp2)); - List<GTScanRange> r = planner.planScanRanges(filter); + GTScanRangePlanner planner = new GTScanRangePlanner(info, null, null,filter); + List<GTScanRange> r = planner.planScanRanges(); assertEquals(1, r.size()); assertEquals("[1421193600000, 10]-[null, 20]", r.get(0).toString()); assertEquals("[[10], [20]]", r.get(0).fuzzyKeys.toString()); @@ -200,33 +209,38 @@ public class DictGridTableTest { // pre-evaluate ever false { LogicalTupleFilter filter = and(timeComp1, timeComp2); - List<GTScanRange> r = planner.planScanRanges(filter); + GTScanRangePlanner planner = new GTScanRangePlanner(info, null, null,filter); + List<GTScanRange> r = planner.planScanRanges(); assertEquals(0, r.size()); } // pre-evaluate ever true { LogicalTupleFilter filter = or(timeComp1, ageComp4); - List<GTScanRange> r = planner.planScanRanges(filter); + GTScanRangePlanner planner = new GTScanRangePlanner(info, null, null,filter); + List<GTScanRange> r = planner.planScanRanges(); assertEquals("[[null, null]-[null, null]]", r.toString()); } // merge overlap range { LogicalTupleFilter filter = or(timeComp1, timeComp3); - List<GTScanRange> r = planner.planScanRanges(filter); + GTScanRangePlanner planner = new GTScanRangePlanner(info, null, null,filter); + List<GTScanRange> r = planner.planScanRanges(); assertEquals("[[null, null]-[null, null]]", r.toString()); } // merge too many ranges { LogicalTupleFilter filter = or(and(timeComp4, ageComp1), and(timeComp4, ageComp2), and(timeComp4, ageComp3)); - List<GTScanRange> r = planner.planScanRanges(filter); + GTScanRangePlanner planner = new GTScanRangePlanner(info, null, null,filter); + List<GTScanRange> r = planner.planScanRanges(); assertEquals(3, r.size()); assertEquals("[1421280000000, 10]-[1421280000000, 10]", r.get(0).toString()); assertEquals("[1421280000000, 20]-[1421280000000, 20]", r.get(1).toString()); assertEquals("[1421280000000, 30]-[1421280000000, 30]", r.get(2).toString()); - List<GTScanRange> r2 = planner.planScanRanges(filter, 2); + planner.setMaxScanRanges(2); + List<GTScanRange> r2 = planner.planScanRanges(); assertEquals("[[1421280000000, 10]-[1421280000000, 30]]", r2.toString()); } } @@ -269,7 +283,7 @@ public class DictGridTableTest { GTScanRequest req = new GTScanRequest(info, null, null, setOf(0), setOf(3), new String[] { "sum" }, filter, true, 0); // note the unEvaluatable column 1 in filter is added to group by - assertEquals("GTScanRequest [range=[null, null]-[null, null], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [null], [null]], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString()); + assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [null], [null]], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString()); doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 20, null]", "[1421280000000, 30, null, 10, null]", "[1421366400000, 20, null, 20, null]", "[1421366400000, 30, null, 20, null]", "[1421452800000, 10, null, 10, null]"); } @@ -284,7 +298,7 @@ public class DictGridTableTest { GTScanRequest req = new GTScanRequest(info, null, null, setOf(0), setOf(3), new String[] { "sum" }, filter, true, 0); // note the evaluatable column 1 in filter is added to returned columns but not in group by - assertEquals("GTScanRequest [range=[null, null]-[null, null], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString()); + assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString()); doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 30, null]", "[1421366400000, 20, null, 40, null]"); }
