KYLIN-1922 refactors
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4e8ed97d Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4e8ed97d Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4e8ed97d Branch: refs/heads/KYLIN-1726 Commit: 4e8ed97d12c53c19d09f736be3baaa9112dcf413 Parents: a201c5b Author: Hongbin Ma <mahong...@apache.org> Authored: Mon Sep 12 23:52:43 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Mon Sep 12 23:53:48 2016 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/cube/CubeInstance.java | 10 +- .../org/apache/kylin/cube/model/CubeDesc.java | 3 +- .../apache/kylin/gridtable/GTScanRequest.java | 4 +- .../metadata/realization/IRealization.java | 1 + .../apache/kylin/storage/StorageContext.java | 34 ++++- .../storage/gtrecord/CubeScanRangePlanner.java | 6 +- .../gtrecord/GTCubeStorageQueryBase.java | 10 +- .../gtrecord/SequentialCubeTupleIterator.java | 7 +- .../gtrecord/StorageResponseGTScatter.java | 117 +++++++++++++++ .../kylin/storage/hybrid/HybridInstance.java | 5 + .../apache/kylin/query/ITCombinationTest.java | 6 +- .../apache/kylin/query/ITKylinQueryTest.java | 150 +++++++------------ .../org/apache/kylin/query/KylinTestBase.java | 56 +++++++ .../src/test/resources/query/sql/query45.sql | 23 +++ .../test/resources/query/sql_limit/query01.sql | 21 +++ .../test/resources/query/sql_limit/query02.sql | 24 +++ .../query/sql_optimize/enable-limit01.sql | 19 --- .../resources/query/sql_timeout/query02.sql | 19 +++ .../rules/RemoveBlackoutRealizationsRule.java | 11 +- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 3 +- .../hbase/cube/v2/ExpectedSizeIterator.java | 2 +- .../storage/hbase/cube/v2/GTBlobScatter.java | 150 ------------------- .../coprocessor/endpoint/CubeVisitService.java | 1 + 23 files changed, 393 insertions(+), 289 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java index 151e142..851b016 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java @@ -35,17 +35,17 @@ import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.realization.CapabilityResult; -import org.apache.kylin.metadata.realization.CapabilityResult.CapabilityInfluence; import org.apache.kylin.metadata.realization.IRealization; import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.metadata.realization.SQLDigest; +import org.apache.kylin.metadata.realization.CapabilityResult.CapabilityInfluence; import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonManagedReference; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; import com.google.common.base.Objects; import com.google.common.collect.Lists; @@ -305,7 +305,6 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, return result; } - public CubeSegment getSegment(String name, SegmentStatusEnum status) { for (CubeSegment segment : segments) { @@ -404,6 +403,11 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, return endTime; } + @Override + public boolean supportsLimitPushDown() { + return getDescriptor().supportsLimitPushDown(); + } + public int getRowKeyColumnCount() { return getDescriptor().getRowkey().getRowKeyColumns().length; } http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index 2d9945a..e6b3d3f 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -992,8 +992,7 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { } public boolean supportsLimitPushDown() { - //currently only ID_SHARDED_HBASE supports limit push down - return getStorageType() == IStorageAware.ID_SHARDED_HBASE; + return getStorageType() != IStorageAware.ID_HBASE && getStorageType() != IStorageAware.ID_HYBRID; } public int getStorageType() { http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java index 4f68806..dc90ed6 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java @@ -42,8 +42,10 @@ import com.google.common.collect.Sets; public class GTScanRequest { private static final Logger logger = LoggerFactory.getLogger(GTScanRequest.class); + //it's not necessary to increase the checkInterval to very large because the check cost is not high - public static final int terminateCheckInterval = 1000; + //changing it might break org.apache.kylin.query.ITKylinQueryTest.testTimeoutQuery() + public static final int terminateCheckInterval = 100; private GTInfo info; private List<GTScanRange> ranges; http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java index fda05ce..040cdc5 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java @@ -57,4 +57,5 @@ public interface IRealization extends IStorageAware { public long getDateRangeEnd(); + public boolean supportsLimitPushDown(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java index acb4960..cc39918 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java @@ -20,7 +20,11 @@ package org.apache.kylin.storage; import java.util.concurrent.atomic.AtomicLong; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.metadata.realization.IRealization; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.Range; @@ -28,6 +32,7 @@ import com.google.common.collect.Range; * @author xjiang */ public class StorageContext { + private static final Logger logger = LoggerFactory.getLogger(StorageContext.class); public static final int DEFAULT_THRESHOLD = 1000000; @@ -35,6 +40,7 @@ public class StorageContext { private int threshold; private int limit; private int offset; + private int finalPushDownLimit; private boolean hasSort; private boolean acceptPartialResult; @@ -62,6 +68,7 @@ public class StorageContext { this.acceptPartialResult = false; this.partialResultReturned = false; + this.finalPushDownLimit = Integer.MAX_VALUE; } public String getConnUrl() { @@ -104,10 +111,33 @@ public class StorageContext { return this.enableLimit; } - public int getStoragePushDownLimit() { + private int getStoragePushDownLimit() { return this.isLimitEnabled() ? this.getOffset() + this.getLimit() : Integer.MAX_VALUE; } - + + public int getFinalPushDownLimit() { + return finalPushDownLimit; + } + + public void setFinalPushDownLimit(IRealization realization) { + + //decide the final limit push down + int tempPushDownLimit = this.getStoragePushDownLimit(); + if (tempPushDownLimit == Integer.MAX_VALUE) { + return; + } + + int pushDownLimitMax = KylinConfig.getInstanceFromEnv().getStoragePushDownLimitMax(); + if (!realization.supportsLimitPushDown()) { + logger.info("Not enabling limit push down because cube storage type not supported"); + } else if (tempPushDownLimit > pushDownLimitMax) { + logger.info("Not enabling limit push down because the limit(including offset) {} is larger than kylin.query.pushdown.limit.max {}", // + tempPushDownLimit, pushDownLimitMax); + } else { + this.finalPushDownLimit = tempPushDownLimit; + } + } + public void markSort() { this.hasSort = true; } http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java index 9f505f3..b011f40 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java @@ -152,9 +152,9 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase { setAllowStorageAggregation(context.isNeedStorageAggregation()).setAggCacheMemThreshold(cubeSegment.getCubeInstance().getConfig().getQueryCoprocessorMemGB()).// setStorageScanRowNumThreshold(context.getThreshold()); - if (cubeDesc.supportsLimitPushDown()) { - builder.setStoragePushDownLimit(context.getStoragePushDownLimit()); - } + if (context.getFinalPushDownLimit() != Integer.MAX_VALUE) + builder.setStoragePushDownLimit(context.getFinalPushDownLimit()); + scanRequest = builder.createGTScanRequest(); } else { scanRequest = null; http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java index f0c2494..31663d0 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@ -113,9 +113,13 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { // replace derived columns in filter with host columns; columns on loosened condition must be added to group by TupleFilter filterD = translateDerived(filter, groupsD); + //set whether to aggr at storage context.setNeedStorageAggregation(isNeedStorageAggregation(cuboid, groupsD, singleValuesD)); + // set limit push down enableStorageLimitIfPossible(cuboid, groups, derivedPostAggregation, groupsD, filter, sqlDigest.aggregations, context); - setThresholdIfNecessary(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory + context.setFinalPushDownLimit(cubeInstance); + // set cautious threshold to prevent out of memory + setThresholdIfNecessary(dimensionsD, metrics, context); List<CubeSegmentScanner> scanners = Lists.newArrayList(); for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) { @@ -135,7 +139,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { if (scanners.isEmpty()) return ITupleIterator.EMPTY_TUPLE_ITERATOR; - return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context, cubeDesc.supportsLimitPushDown()); + return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context); } protected boolean skipZeroInputSegment(CubeSegment cubeSegment) { @@ -398,7 +402,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { private void enableStorageLimitIfPossible(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> derivedPostAggregation, Collection<TblColRef> groupsD, TupleFilter filter, Collection<FunctionDesc> functionDescs, StorageContext context) { boolean possible = true; - boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled()); + boolean goodFilter = filter == null || TupleFilter.isEvaluableRecursively(filter); if (!goodFilter) { possible = false; logger.info("Storage limit push down is impossible because the filter is unevaluatable"); http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java index 7059473..bef0e88 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java @@ -25,7 +25,6 @@ import java.util.Set; import javax.annotation.Nullable; -import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.TblColRef; @@ -54,7 +53,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator { private int scanCountDelta; public SequentialCubeTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid, Set<TblColRef> selectedDimensions, // - Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context, boolean supportLimitPushDown) { + Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context) { this.context = context; this.scanners = scanners; @@ -63,8 +62,8 @@ public class SequentialCubeTupleIterator implements ITupleIterator { segmentCubeTupleIterators.add(new SegmentCubeTupleIterator(scanner, cuboid, selectedDimensions, selectedMetrics, returnTupleInfo, context)); } - this.storagePushDownLimit = context.getStoragePushDownLimit(); - if (!supportLimitPushDown || storagePushDownLimit > KylinConfig.getInstanceFromEnv().getStoragePushDownLimitMax()) { + this.storagePushDownLimit = context.getFinalPushDownLimit(); + if (storagePushDownLimit == Integer.MAX_VALUE) { //normal case tupleIterator = Iterators.concat(segmentCubeTupleIterators.iterator()); } else { http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java new file mode 100644 index 0000000..fe1afd3 --- /dev/null +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.gtrecord; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; + +import javax.annotation.Nullable; + +import org.apache.kylin.common.util.ImmutableBitSet; +import org.apache.kylin.gridtable.GTInfo; +import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.gridtable.IGTScanner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.collect.Iterators; + +/** + * scatter the blob returned from region server to a iterable of gtrecords + */ +public class StorageResponseGTScatter implements IGTScanner { + + private static final Logger logger = LoggerFactory.getLogger(StorageResponseGTScatter.class); + + private GTInfo info; + private Iterator<byte[]> blocks; + private ImmutableBitSet columns; + private long totalScannedCount; + private int storagePushDownLimit = -1; + + public StorageResponseGTScatter(GTInfo info, Iterator<byte[]> blocks, ImmutableBitSet columns, long totalScannedCount, int storagePushDownLimit) { + this.info = info; + this.blocks = blocks; + this.columns = columns; + this.totalScannedCount = totalScannedCount; + this.storagePushDownLimit = storagePushDownLimit; + } + + @Override + public GTInfo getInfo() { + return info; + } + + @Override + public long getScannedRowCount() { + return totalScannedCount; + } + + @Override + public void close() throws IOException { + //do nothing + } + + @Override + public Iterator<GTRecord> iterator() { + Iterator<Iterator<GTRecord>> shardSubsets = Iterators.transform(blocks, new EndpointResponseGTScatterFunc()); + if (storagePushDownLimit != Integer.MAX_VALUE) { + return new SortedIteratorMergerWithLimit<GTRecord>(shardSubsets, storagePushDownLimit, GTRecord.getPrimaryKeyComparator()).getIterator(); + } else { + return Iterators.concat(shardSubsets); + } + } + + class EndpointResponseGTScatterFunc implements Function<byte[], Iterator<GTRecord>> { + @Nullable + @Override + public Iterator<GTRecord> apply(@Nullable final byte[] input) { + + return new Iterator<GTRecord>() { + private ByteBuffer inputBuffer = null; + //rotate between two buffer GTRecord to support SortedIteratorMergerWithLimit, which will peek one more GTRecord + private GTRecord firstRecord = null; + + @Override + public boolean hasNext() { + if (inputBuffer == null) { + inputBuffer = ByteBuffer.wrap(input); + firstRecord = new GTRecord(info); + } + + return inputBuffer.position() < inputBuffer.limit(); + } + + @Override + public GTRecord next() { + firstRecord.loadColumns(columns, inputBuffer); + return firstRecord; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java index a0262e3..9b3a0fc 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java @@ -257,6 +257,11 @@ public class HybridInstance extends RootPersistentEntity implements IRealization } @Override + public boolean supportsLimitPushDown() { + return false; + } + + @Override public List<TblColRef> getAllDimensions() { init(); return allDimensions; http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/kylin-it/src/test/java/org/apache/kylin/query/ITCombinationTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITCombinationTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITCombinationTest.java index cbd4e44..f4667af 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/ITCombinationTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/ITCombinationTest.java @@ -85,10 +85,10 @@ public class ITCombinationTest extends ITKylinQueryTest { // unset } - RemoveBlackoutRealizationsRule.blackouts.clear(); + RemoveBlackoutRealizationsRule.blackList.clear(); if (excludeViewCubes) { - RemoveBlackoutRealizationsRule.blackouts.add("CUBE[name=test_kylin_cube_with_view_left_join_empty]"); - RemoveBlackoutRealizationsRule.blackouts.add("CUBE[name=test_kylin_cube_with_view_inner_join_empty]"); + RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_with_view_left_join_empty]"); + RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_with_view_inner_join_empty]"); } if ("v1".equalsIgnoreCase(queryEngine)) http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java index c1c9767..b9895e8 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java @@ -21,31 +21,22 @@ package org.apache.kylin.query; import static org.junit.Assert.assertTrue; import java.io.File; -import java.sql.DriverManager; import java.sql.SQLException; import java.util.List; import java.util.Map; -import java.util.Properties; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.debug.BackdoorToggles; -import org.apache.kylin.common.util.HBaseMetadataTestCase; import org.apache.kylin.gridtable.GTScanSelfTerminatedException; +import org.apache.kylin.gridtable.GTScanTimeoutException; import org.apache.kylin.gridtable.StorageSideBehavior; -import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.realization.RealizationType; -import org.apache.kylin.query.enumerator.OLAPQuery; -import org.apache.kylin.query.relnode.OLAPContext; import org.apache.kylin.query.routing.Candidate; import org.apache.kylin.query.routing.rules.RemoveBlackoutRealizationsRule; -import org.apache.kylin.query.schema.OLAPSchemaFactory; import org.apache.kylin.storage.hbase.HBaseStorage; -import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler; import org.dbunit.database.DatabaseConnection; import org.dbunit.database.IDatabaseConnection; -import org.hamcrest.BaseMatcher; -import org.hamcrest.Description; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Ignore; @@ -73,8 +64,8 @@ public class ITKylinQueryTest extends KylinTestBase { setupAll(); - RemoveBlackoutRealizationsRule.blackouts.add("CUBE[name=test_kylin_cube_with_view_left_join_empty]"); - RemoveBlackoutRealizationsRule.blackouts.add("CUBE[name=test_kylin_cube_with_view_inner_join_empty]"); + RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_with_view_left_join_empty]"); + RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_with_view_inner_join_empty]"); } @AfterClass @@ -84,105 +75,69 @@ public class ITKylinQueryTest extends KylinTestBase { clean(); } - protected static void setupAll() throws Exception { - //setup env - HBaseMetadataTestCase.staticCreateTestMetadata(); - config = KylinConfig.getInstanceFromEnv(); - - //setup cube conn - File olapTmp = OLAPSchemaFactory.createTempOLAPJson(ProjectInstance.DEFAULT_PROJECT_NAME, config); - Properties props = new Properties(); - props.setProperty(OLAPQuery.PROP_SCAN_THRESHOLD, "10001"); - cubeConnection = DriverManager.getConnection("jdbc:calcite:model=" + olapTmp.getAbsolutePath(), props); - - //setup h2 - h2Connection = DriverManager.getConnection("jdbc:h2:mem:db" + (h2InstanceCount++), "sa", ""); - // Load H2 Tables (inner join) - H2Database h2DB = new H2Database(h2Connection, config); - h2DB.loadAllTables(); - - } - - protected static void clean() { - if (cubeConnection != null) - closeConnection(cubeConnection); - if (h2Connection != null) - closeConnection(h2Connection); - - ObserverEnabler.forceCoprocessorUnset(); - HBaseMetadataTestCase.staticCleanupTestMetadata(); - RemoveBlackoutRealizationsRule.blackouts.clear(); - - } - protected String getQueryFolderPrefix() { return ""; } + protected Throwable findRoot(Throwable throwable) { + while (true) { + if (throwable.getCause() != null) { + throwable = throwable.getCause(); + } else { + break; + } + } + return throwable; + } + @Test public void testTimeoutQuery() throws Exception { if (HBaseStorage.overwriteStorageQuery != null) { //v1 engine does not suit return; } - - thrown.expect(SQLException.class); - - //should not break at table duplicate check, should fail at model duplicate check - thrown.expect(new BaseMatcher<Throwable>() { - @Override - public boolean matches(Object item) { - - //find the "root" - Throwable throwable = (Throwable) item; - while (true) { - if (throwable.getCause() != null) { - throwable = throwable.getCause(); - } else { - break; - } - } - - if (throwable instanceof GTScanSelfTerminatedException) { - return true; - } - return false; - } - - @Override - public void describeTo(Description description) { - } - }); - - runTimetoutQueries(); - - } - - protected void runTimetoutQueries() throws Exception { try { Map<String, String> toggles = Maps.newHashMap(); toggles.put(BackdoorToggles.DEBUG_TOGGLE_COPROCESSOR_BEHAVIOR, StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM_WITHDELAY.toString());//delay 10ms for every scan BackdoorToggles.setToggles(toggles); - KylinConfig.getInstanceFromEnv().setProperty("kylin.query.cube.visit.timeout.times", "0.03");//set timeout to 9s + KylinConfig.getInstanceFromEnv().setProperty("kylin.query.cube.visit.timeout.times", "0.01");//set timeout to 3s //these two cubes has RAW measure, will disturb limit push down - RemoveBlackoutRealizationsRule.blackouts.add("CUBE[name=test_kylin_cube_without_slr_left_join_empty]"); - RemoveBlackoutRealizationsRule.blackouts.add("CUBE[name=test_kylin_cube_without_slr_inner_join_empty]"); + RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_without_slr_left_join_empty]"); + RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_without_slr_inner_join_empty]"); - execAndCompQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_timeout", null, true); + runTimeoutQueries(); } finally { //these two cubes has RAW measure, will disturb limit push down - RemoveBlackoutRealizationsRule.blackouts.remove("CUBE[name=test_kylin_cube_without_slr_left_join_empty]"); - RemoveBlackoutRealizationsRule.blackouts.remove("CUBE[name=test_kylin_cube_without_slr_inner_join_empty]"); + RemoveBlackoutRealizationsRule.blackList.remove("CUBE[name=test_kylin_cube_without_slr_left_join_empty]"); + RemoveBlackoutRealizationsRule.blackList.remove("CUBE[name=test_kylin_cube_without_slr_inner_join_empty]"); KylinConfig.getInstanceFromEnv().setProperty("kylin.query.cube.visit.timeout.times", "1");//set timeout to 9s BackdoorToggles.cleanToggles(); } } + protected void runTimeoutQueries() throws Exception { + List<File> sqlFiles = getFilesFromFolder(new File(getQueryFolderPrefix() + "src/test/resources/query/sql_timeout"), ".sql"); + for (File sqlFile : sqlFiles) { + try { + runSQL(sqlFile, false, false); + } catch (SQLException e) { + + System.out.println(e.getMessage()); + + if (findRoot(e) instanceof GTScanSelfTerminatedException) { + //expected + continue; + } + } + throw new RuntimeException("Expecting GTScanTimeoutException"); + } + } + //don't try to ignore this test, try to clean your "temp" folder @Test public void testTempQuery() throws Exception { @@ -346,11 +301,25 @@ public class ITKylinQueryTest extends KylinTestBase { execAndCompDynamicQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_dynamic", null, true); } - @Ignore("simple query will be supported by ii") @Test public void testLimitEnabled() throws Exception { - runSqlFile(getQueryFolderPrefix() + "src/test/resources/query/sql_optimize/enable-limit01.sql"); - assertLimitWasEnabled(); + if (HBaseStorage.overwriteStorageQuery == null) {//v1 query engine will not work + + try { + //other cubes have strange aggregation groups + RemoveBlackoutRealizationsRule.whiteList.add("CUBE[name=test_kylin_cube_with_slr_empty]"); + + List<File> sqlFiles = getFilesFromFolder(new File(getQueryFolderPrefix() + "src/test/resources/query/sql_limit"), ".sql"); + for (File sqlFile : sqlFiles) { + runSQL(sqlFile, false, false); + assertTrue(checkLimitEnabled()); + assertTrue(checkFinalPushDownLimit()); + } + + } finally { + RemoveBlackoutRealizationsRule.whiteList.remove("CUBE[name=test_kylin_cube_with_slr_empty]"); + } + } } @Test @@ -377,13 +346,4 @@ public class ITKylinQueryTest extends KylinTestBase { this.batchExecuteQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_window"); } - private void assertLimitWasEnabled() { - OLAPContext context = getFirstOLAPContext(); - assertTrue(context.storageContext.isLimitEnabled()); - } - - private OLAPContext getFirstOLAPContext() { - return OLAPContext.getThreadLocalContexts().iterator().next(); - } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java index 2ad1105..294750e 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; import java.sql.Connection; +import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; @@ -38,12 +39,20 @@ import java.util.Arrays; import java.util.Comparator; import java.util.HashSet; import java.util.List; +import java.util.Properties; import java.util.Set; import java.util.TreeSet; import java.util.logging.LogManager; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HBaseMetadataTestCase; +import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.query.enumerator.OLAPQuery; +import org.apache.kylin.query.relnode.OLAPContext; +import org.apache.kylin.query.routing.rules.RemoveBlackoutRealizationsRule; +import org.apache.kylin.query.schema.OLAPSchemaFactory; +import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler; import org.dbunit.Assertion; import org.dbunit.database.DatabaseConfig; import org.dbunit.database.DatabaseConnection; @@ -568,4 +577,51 @@ public class KylinTestBase { printInfo(sb.toString()); return count; } + + protected static void setupAll() throws Exception { + //setup env + HBaseMetadataTestCase.staticCreateTestMetadata(); + config = KylinConfig.getInstanceFromEnv(); + + //setup cube conn + File olapTmp = OLAPSchemaFactory.createTempOLAPJson(ProjectInstance.DEFAULT_PROJECT_NAME, config); + Properties props = new Properties(); + props.setProperty(OLAPQuery.PROP_SCAN_THRESHOLD, "10001"); + cubeConnection = DriverManager.getConnection("jdbc:calcite:model=" + olapTmp.getAbsolutePath(), props); + + //setup h2 + h2Connection = DriverManager.getConnection("jdbc:h2:mem:db" + (h2InstanceCount++), "sa", ""); + // Load H2 Tables (inner join) + H2Database h2DB = new H2Database(h2Connection, config); + h2DB.loadAllTables(); + + } + + protected static void clean() { + if (cubeConnection != null) + closeConnection(cubeConnection); + if (h2Connection != null) + closeConnection(h2Connection); + + ObserverEnabler.forceCoprocessorUnset(); + HBaseMetadataTestCase.staticCleanupTestMetadata(); + RemoveBlackoutRealizationsRule.blackList.clear(); + + } + + protected boolean checkLimitEnabled() { + OLAPContext context = getFirstOLAPContext(); + return (context.storageContext.isLimitEnabled()); + } + + protected boolean checkFinalPushDownLimit() { + OLAPContext context = getFirstOLAPContext(); + return (context.storageContext.getFinalPushDownLimit() != Integer.MAX_VALUE); + + } + + private OLAPContext getFirstOLAPContext() { + return OLAPContext.getThreadLocalContexts().iterator().next(); + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/kylin-it/src/test/resources/query/sql/query45.sql ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/resources/query/sql/query45.sql b/kylin-it/src/test/resources/query/sql/query45.sql new file mode 100644 index 0000000..0c78657 --- /dev/null +++ b/kylin-it/src/test/resources/query/sql/query45.sql @@ -0,0 +1,23 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + + + +select seller_id, sum(price) from test_kylin_fact + where lstg_format_name='FP-GTC' + group by seller_id limit 20 http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/kylin-it/src/test/resources/query/sql_limit/query01.sql ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/resources/query/sql_limit/query01.sql b/kylin-it/src/test/resources/query/sql_limit/query01.sql new file mode 100644 index 0000000..fca8175 --- /dev/null +++ b/kylin-it/src/test/resources/query/sql_limit/query01.sql @@ -0,0 +1,21 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +select * from test_kylin_fact + where lstg_format_name='FP-GTC' + limit 20 http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/kylin-it/src/test/resources/query/sql_limit/query02.sql ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/resources/query/sql_limit/query02.sql b/kylin-it/src/test/resources/query/sql_limit/query02.sql new file mode 100644 index 0000000..53f7bd7 --- /dev/null +++ b/kylin-it/src/test/resources/query/sql_limit/query02.sql @@ -0,0 +1,24 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + + + +select seller_id, sum(price) from test_kylin_fact + where lstg_format_name='FP-GTC' + group by seller_id limit 20 + http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/kylin-it/src/test/resources/query/sql_optimize/enable-limit01.sql ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/resources/query/sql_optimize/enable-limit01.sql b/kylin-it/src/test/resources/query/sql_optimize/enable-limit01.sql deleted file mode 100644 index 4a62d92..0000000 --- a/kylin-it/src/test/resources/query/sql_optimize/enable-limit01.sql +++ /dev/null @@ -1,19 +0,0 @@ --- --- Licensed to the Apache Software Foundation (ASF) under one --- or more contributor license agreements. See the NOTICE file --- distributed with this work for additional information --- regarding copyright ownership. The ASF licenses this file --- to you under the Apache License, Version 2.0 (the --- "License"); you may not use this file except in compliance --- with the License. You may obtain a copy of the License at --- --- http://www.apache.org/licenses/LICENSE-2.0 --- --- Unless required by applicable law or agreed to in writing, software --- distributed under the License is distributed on an "AS IS" BASIS, --- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --- See the License for the specific language governing permissions and --- limitations under the License. --- - -select * from test_kylin_fact limit 10 http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/kylin-it/src/test/resources/query/sql_timeout/query02.sql ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/resources/query/sql_timeout/query02.sql b/kylin-it/src/test/resources/query/sql_timeout/query02.sql new file mode 100644 index 0000000..2f187a4 --- /dev/null +++ b/kylin-it/src/test/resources/query/sql_timeout/query02.sql @@ -0,0 +1,19 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +select seller_id,lstg_format_name,sum(price) from test_kylin_fact group by seller_id,lstg_format_name \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveBlackoutRealizationsRule.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveBlackoutRealizationsRule.java b/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveBlackoutRealizationsRule.java index 9c3d7c9..f299d17 100644 --- a/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveBlackoutRealizationsRule.java +++ b/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveBlackoutRealizationsRule.java @@ -31,15 +31,22 @@ import com.google.common.collect.Sets; * for IT use, exclude some cubes */ public class RemoveBlackoutRealizationsRule extends RoutingRule { - public static Set<String> blackouts = Sets.newHashSet(); + public static Set<String> blackList = Sets.newHashSet(); + public static Set<String> whiteList = Sets.newHashSet(); @Override public void apply(List<Candidate> candidates) { for (Iterator<Candidate> iterator = candidates.iterator(); iterator.hasNext();) { Candidate candidate = iterator.next(); - if (blackouts.contains(candidate.getRealization().getCanonicalName())) { + if (blackList.contains(candidate.getRealization().getCanonicalName())) { iterator.remove(); + continue; + } + + if (!whiteList.isEmpty() && !whiteList.contains(candidate.getRealization().getCanonicalName())) { + iterator.remove(); + continue; } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 573951b..c7de287 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -45,6 +45,7 @@ import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.GTScanSelfTerminatedException; import org.apache.kylin.gridtable.IGTScanner; +import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter; import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos; import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest; @@ -222,7 +223,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { }); } - return new GTBlobScatter(fullGTInfo, epResultItr, scanRequest.getColumns(), totalScannedCount.get(), scanRequest.getStoragePushDownLimit()); + return new StorageResponseGTScatter(fullGTInfo, epResultItr, scanRequest.getColumns(), totalScannedCount.get(), scanRequest.getStoragePushDownLimit()); } private ByteString serializeGTScanReq(GTScanRequest scanRequest) { http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java index f4729a3..c27e5fc 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java @@ -85,7 +85,7 @@ class ExpectedSizeIterator implements Iterator<byte[]> { byte[] ret = null; while (ret == null && coprocException == null && timeoutTS > System.currentTimeMillis()) { - ret = queue.poll(5000, TimeUnit.MILLISECONDS); + ret = queue.poll(10000, TimeUnit.MILLISECONDS); } if (coprocException != null) { http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/GTBlobScatter.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/GTBlobScatter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/GTBlobScatter.java deleted file mode 100644 index 631510e..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/GTBlobScatter.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.storage.hbase.cube.v2; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Iterator; - -import javax.annotation.Nullable; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.ImmutableBitSet; -import org.apache.kylin.gridtable.GTInfo; -import org.apache.kylin.gridtable.GTRecord; -import org.apache.kylin.gridtable.IGTScanner; -import org.apache.kylin.storage.gtrecord.SortedIteratorMergerWithLimit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Function; -import com.google.common.collect.Iterators; - -/** - * scatter the blob returned from region server to a iterable of gtrecords - */ -class GTBlobScatter implements IGTScanner { - - private static final Logger logger = LoggerFactory.getLogger(GTBlobScatter.class); - - private GTInfo info; - private Iterator<byte[]> blocks; - private ImmutableBitSet columns; - private long totalScannedCount; - private int storagePushDownLimit = -1; - - public GTBlobScatter(GTInfo info, Iterator<byte[]> blocks, ImmutableBitSet columns, long totalScannedCount, int storagePushDownLimit) { - this.info = info; - this.blocks = blocks; - this.columns = columns; - this.totalScannedCount = totalScannedCount; - this.storagePushDownLimit = storagePushDownLimit; - } - - @Override - public GTInfo getInfo() { - return info; - } - - @Override - public long getScannedRowCount() { - return totalScannedCount; - } - - @Override - public void close() throws IOException { - //do nothing - } - - @Override - public Iterator<GTRecord> iterator() { - Iterator<Iterator<GTRecord>> shardSubsets = Iterators.transform(blocks, new GTBlobScatterFunc()); - if (storagePushDownLimit <= KylinConfig.getInstanceFromEnv().getStoragePushDownLimitMax()) { - return new SortedIteratorMergerWithLimit<GTRecord>(shardSubsets, storagePushDownLimit, GTRecord.getPrimaryKeyComparator()).getIterator(); - } else { - return Iterators.concat(shardSubsets); - } - } - - class GTBlobScatterFunc implements Function<byte[], Iterator<GTRecord>> { - @Nullable - @Override - public Iterator<GTRecord> apply(@Nullable final byte[] input) { - - return new Iterator<GTRecord>() { - private ByteBuffer inputBuffer = null; - //rotate between two buffer GTRecord to support SortedIteratorMergerWithLimit, which will peek one more GTRecord - private GTRecord firstRecord = null; - private GTRecord secondRecord = null; - private GTRecord thirdRecord = null; - private GTRecord fourthRecord = null; - private int counter = 0; - - @Override - public boolean hasNext() { - if (inputBuffer == null) { - inputBuffer = ByteBuffer.wrap(input); - firstRecord = new GTRecord(info); - secondRecord = new GTRecord(info); - thirdRecord = new GTRecord(info); - fourthRecord = new GTRecord(info); - } - - return inputBuffer.position() < inputBuffer.limit(); - } - - @Override - public GTRecord next() { - firstRecord.loadColumns(columns, inputBuffer); - //logger.info("A GTRecord: " + System.identityHashCode(this) + " " + firstRecord + " " + System.identityHashCode(firstRecord)); - return firstRecord; - // GTRecord temp = new GTRecord(info); - // temp.loadColumns(columns, inputBuffer); - // return temp; - - // counter++; - // int index = counter % 4; - // if (index == 1) { - // firstRecord.loadColumns(columns, inputBuffer); - // //logger.info("A GTRecord: " + System.identityHashCode(this) + " " + firstRecord + " " + System.identityHashCode(firstRecord)); - // return firstRecord; - // } else if (index == 2) { - // secondRecord.loadColumns(columns, inputBuffer); - // //logger.info("B GTRecord: " + System.identityHashCode(this) + " " + secondRecord + " " + System.identityHashCode(secondRecord)); - // return secondRecord; - // } else if (index == 3) { - // thirdRecord.loadColumns(columns, inputBuffer); - // //logger.info("C GTRecord: " + System.identityHashCode(this) + " " + thirdRecord + " " + System.identityHashCode(thirdRecord)); - // return thirdRecord; - // } else { - // fourthRecord.loadColumns(columns, inputBuffer); - // //logger.info("D GTRecord: " + System.identityHashCode(this) + " " + fourthRecord + " " + System.identityHashCode(fourthRecord)); - // return fourthRecord; - // } - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/4e8ed97d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index ffe41c5..13a7b53 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -246,6 +246,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement final MutableBoolean scanNormalComplete = new MutableBoolean(true); final long deadline = scanReq.getTimeout() + this.serviceStartTime; + logger.info("deadline is " + deadline); final long storagePushDownLimit = scanReq.getStoragePushDownLimit(); final CellListIterator cellListIterator = new CellListIterator() {