This is an automated email from the ASF dual-hosted git repository. liyang pushed a commit to branch sync in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 18d8663f5e0902ca8093b8e3d6ca89993c1b5d3f Author: ZhansShaoxiong <[email protected]> AuthorDate: Mon Jun 18 10:55:51 2018 +0800 KYLIN-3370 enhance segment pruning --- .../java/org/apache/kylin/cube/CubeSegment.java | 12 + .../org/apache/kylin/cube/DimensionRangeInfo.java | 104 ++++++++ .../apache/kylin/cube/common/SegmentPruner.java | 168 +++++++++++++ .../kylin/cube/gridtable/ScanRangePlannerBase.java | 4 +- .../apache/kylin/cube/DimensionRangeInfoTest.java | 86 +++++++ .../kylin/cube/common/SegmentPrunerTest.java | 186 ++++++++++++++ .../apache/kylin/metadata/filter/TupleFilter.java | 28 ++- .../kylin/metadata/datatype/DataTypeOrderTest.java | 57 +++++ .../kylin/metadata/filter/TupleFilterTest.java | 23 ++ .../storage/gtrecord/CubeScanRangePlanner.java | 30 +-- .../storage/gtrecord/GTCubeStorageQueryBase.java | 9 +- .../kylin/storage/gtrecord/DictGridTableTest.java | 69 +++--- .../mr/steps/CalculateStatsFromBaseCuboidJob.java | 1 - .../mr/steps/FactDistinctColumnPartitioner.java | 5 +- .../engine/mr/steps/FactDistinctColumnsJob.java | 2 +- .../engine/mr/steps/FactDistinctColumnsMapper.java | 43 +--- .../mr/steps/FactDistinctColumnsMapperBase.java | 20 +- .../mr/steps/FactDistinctColumnsReducer.java | 90 ++++--- .../steps/FactDistinctColumnsReducerMapping.java | 87 +++---- .../mr/steps/UpdateCubeInfoAfterBuildStep.java | 83 ++++--- .../mr/steps/UpdateCubeInfoAfterMergeStep.java | 16 +- .../FactDistinctColumnsReducerMappingTest.java | 12 +- .../cube/ssb_cube_with_dimention_range.json | 110 +++++++++ .../cube_desc/ssb_cube_with_dimention_range.json | 269 +++++++++++++++++++++ .../kylin/provision/BuildCubeWithEngine.java | 86 +++++-- 25 files changed, 1312 insertions(+), 288 deletions(-) diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java index d49c273..75c90a4 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java @@ -119,6 +119,10 @@ public class CubeSegment implements IBuildable, ISegment, Serializable { @JsonInclude(JsonInclude.Include.NON_EMPTY) private Map<String, String> additionalInfo = new LinkedHashMap<String, String>(); + @JsonProperty("dimension_range_info_map") + @JsonInclude(JsonInclude.Include.NON_EMPTY) + private Map<String, DimensionRangeInfo> dimensionRangeInfoMap = Maps.newHashMap(); + private Map<Long, Short> cuboidBaseShards = Maps.newConcurrentMap(); // cuboid id ==> base(starting) shard for this cuboid // lazy init @@ -575,4 +579,12 @@ public class CubeSegment implements IBuildable, ISegment, Serializable { public void setSourcePartitionOffsetStart(Map<Integer, Long> sourcePartitionOffsetStart) { this.sourcePartitionOffsetStart = sourcePartitionOffsetStart; } + + public Map<String, DimensionRangeInfo> getDimensionRangeInfoMap() { + return dimensionRangeInfoMap; + } + + public void setDimensionRangeInfoMap(Map<String, DimensionRangeInfo> dimensionRangeInfoMap) { + this.dimensionRangeInfoMap = dimensionRangeInfoMap; + } } diff --git a/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java b/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java new file mode 100644 index 0000000..e36ca96 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.cube; + +import java.util.Map; + +import org.apache.kylin.metadata.datatype.DataTypeOrder; +import org.apache.kylin.metadata.model.DataModelDesc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Maps; + +@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) +public class DimensionRangeInfo { + + private static final Logger logger = LoggerFactory.getLogger(DimensionRangeInfo.class); + + public static Map<String, DimensionRangeInfo> mergeRangeMap(DataModelDesc model, Map<String, DimensionRangeInfo> m1, + Map<String, DimensionRangeInfo> m2) { + + if (!m1.keySet().equals(m2.keySet())) { + logger.warn("Merging incompatible maps of DimensionRangeInfo, keys in m1 " + m1.keySet() + ", keys in m2 " + + m2.keySet()); + } + + Map<String, DimensionRangeInfo> result = Maps.newHashMap(); + + for (String colId : m1.keySet()) { + if (!m2.containsKey(colId)) + continue; + + DimensionRangeInfo r1 = m1.get(colId); + DimensionRangeInfo r2 = m2.get(colId); + + DimensionRangeInfo newR; + if (r1.getMin() == null && r1.getMax() == null) { + newR = r2; // when r1 is all null or has 0 records + } else if (r2.getMin() == null && r2.getMax() == null) { + newR = r1; // when r2 is all null or has 0 records + } else { + DataTypeOrder order = model.findColumn(colId).getType().getOrder(); + String newMin = order.min(r1.getMin(), r2.getMin()); + String newMax = order.max(r1.getMax(), r2.getMax()); + newR = new DimensionRangeInfo(newMin, newMax); + } + + result.put(colId, newR); + } + + return result; + } + + // ============================================================================ + + @JsonProperty("min") + private String min; + + @JsonProperty("max") + private String max; + + public DimensionRangeInfo() {} + + public DimensionRangeInfo(String min, String max) { + if (min == null && max != null || min != null && max == null) + throw new IllegalStateException(); + + this.min = min; + this.max = max; + } + + public String getMin() { + return min; + } + + public String getMax() { + return max; + } + + @Override + public String toString() { + return "[" + min + ", " + max + "]"; + } + +} diff --git a/core-cube/src/main/java/org/apache/kylin/cube/common/SegmentPruner.java b/core-cube/src/main/java/org/apache/kylin/cube/common/SegmentPruner.java new file mode 100644 index 0000000..098f334 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/common/SegmentPruner.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.cube.common; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang.StringUtils; +import org.apache.kylin.common.util.DateFormat; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.DimensionRangeInfo; +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.datatype.DataTypeOrder; +import org.apache.kylin.metadata.filter.CompareTupleFilter; +import org.apache.kylin.metadata.filter.TupleFilter; +import org.apache.kylin.metadata.model.DataModelDesc; +import org.apache.kylin.metadata.model.PartitionDesc; +import org.apache.kylin.metadata.model.SegmentRange.TSRange; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SegmentPruner { + private static final Logger logger = LoggerFactory.getLogger(SegmentPruner.class); + + final private Set<CompareTupleFilter> mustTrueCompares; + + public SegmentPruner(TupleFilter filter) { + this.mustTrueCompares = filter == null ? Collections.<CompareTupleFilter> emptySet() + : filter.findMustTrueCompareFilters(); + } + + public List<CubeSegment> listSegmentsForQuery(CubeInstance cube) { + List<CubeSegment> r = new ArrayList<>(); + for (CubeSegment seg : cube.getSegments(SegmentStatusEnum.READY)) { + if (check(seg)) + r.add(seg); + } + return r; + } + + public boolean check(CubeSegment seg) { + + if (seg.getInputRecords() == 0) { + if (seg.getConfig().isSkippingEmptySegments()) { + logger.debug("Prune segment {} due to 0 input record", seg); + return false; + } else { + logger.debug("Insist scan of segment {} having 0 input record", seg); + } + } + + Map<String, DimensionRangeInfo> segDimRangInfoMap = seg.getDimensionRangeInfoMap(); + for (CompareTupleFilter comp : mustTrueCompares) { + TblColRef col = comp.getColumn(); + + DimensionRangeInfo dimRangeInfo = segDimRangInfoMap.get(col.getIdentity()); + if (dimRangeInfo == null) + dimRangeInfo = tryDeduceRangeFromPartitionCol(seg, col); + if (dimRangeInfo == null) + continue; + + String minVal = dimRangeInfo.getMin(); + String maxVal = dimRangeInfo.getMax(); + + if (!satisfy(comp, minVal, maxVal)) { + logger.debug("Prune segment {} due to given filter", seg); + return false; + } + } + + logger.debug("Pruner passed on segment {}", seg); + return true; + } + + private DimensionRangeInfo tryDeduceRangeFromPartitionCol(CubeSegment seg, TblColRef col) { + DataModelDesc model = seg.getModel(); + PartitionDesc part = model.getPartitionDesc(); + + if (!part.isPartitioned()) + return null; + if (!col.equals(part.getPartitionDateColumnRef())) + return null; + + // deduce the dim range from TSRange + TSRange tsRange = seg.getTSRange(); + if (tsRange.start.isMin || tsRange.end.isMax) + return null; // DimensionRangeInfo cannot express infinite + + String min = tsRangeToStr(tsRange.start.v, part); + String max = tsRangeToStr(tsRange.end.v - 1, part); // note the -1, end side is exclusive + return new DimensionRangeInfo(min, max); + } + + private String tsRangeToStr(long ts, PartitionDesc part) { + String value; + DataType partitionColType = part.getPartitionDateColumnRef().getType(); + if (partitionColType.isDate()) { + value = DateFormat.formatToDateStr(ts); + } else if (partitionColType.isTimeFamily()) { + value = DateFormat.formatToTimeWithoutMilliStr(ts); + } else if (partitionColType.isStringFamily() || partitionColType.isIntegerFamily()) {//integer like 20160101 + String partitionDateFormat = part.getPartitionDateFormat(); + if (StringUtils.isEmpty(partitionDateFormat)) { + value = "" + ts; + } else { + value = DateFormat.formatToDateStr(ts, partitionDateFormat); + } + } else { + throw new RuntimeException("Type " + partitionColType + " is not valid partition column type"); + } + return value; + } + + private boolean satisfy(CompareTupleFilter comp, String minVal, String maxVal) { + + TblColRef col = comp.getColumn(); + DataTypeOrder order = col.getType().getOrder(); + String filterVal = toString(comp.getFirstValue()); + + switch (comp.getOperator()) { + case EQ: + case IN: + String filterMin = order.min((Set<String>) comp.getValues()); + String filterMax = order.max((Set<String>) comp.getValues()); + return order.compare(filterMin, maxVal) <= 0 && order.compare(minVal, filterMax) <= 0; + case LT: + return order.compare(minVal, filterVal) < 0; + case LTE: + return order.compare(minVal, filterVal) <= 0; + case GT: + return order.compare(maxVal, filterVal) > 0; + case GTE: + return order.compare(maxVal, filterVal) >= 0; + case NEQ: + case NOTIN: + case ISNULL: + case ISNOTNULL: + default: + return true; + } + } + + private String toString(Object v) { + return v == null ? null : v.toString(); + } +} diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java index 811d512..97e4dc3 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java @@ -29,7 +29,6 @@ import java.util.Set; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.ImmutableBitSet; -import org.apache.kylin.common.util.Pair; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.metadata.expression.TupleExpression; @@ -49,14 +48,13 @@ public abstract class ScanRangePlannerBase { //GT protected GTInfo gtInfo; protected TupleFilter gtFilter; - protected Pair<ByteArray, ByteArray> gtStartAndEnd; - protected TblColRef gtPartitionCol; protected ImmutableBitSet gtDimensions; protected ImmutableBitSet gtAggrGroups; protected ImmutableBitSet gtAggrMetrics; protected String[] gtAggrFuncs; protected TupleFilter havingFilter; protected boolean isPartitionColUsingDatetimeEncoding = true; + protected int onlyShardId = -1; protected RecordComparator rangeStartComparator; protected RecordComparator rangeEndComparator; diff --git a/core-cube/src/test/java/org/apache/kylin/cube/DimensionRangeInfoTest.java b/core-cube/src/test/java/org/apache/kylin/cube/DimensionRangeInfoTest.java new file mode 100644 index 0000000..94acf9f --- /dev/null +++ b/core-cube/src/test/java/org/apache/kylin/cube/DimensionRangeInfoTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.cube; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.metadata.model.DataModelDesc; +import org.apache.kylin.metadata.model.DataModelManager; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class DimensionRangeInfoTest extends LocalFileMetadataTestCase { + + @Before + public void setUp() throws Exception { + this.createTestMetadata(); + } + + @After + public void after() throws Exception { + this.cleanupTestMetadata(); + } + + @Test + public void testMergeRangeMap() { + DataModelDesc model = DataModelManager.getInstance(getTestConfig()).getDataModelDesc("ci_inner_join_model"); + String colId = "TEST_KYLIN_FACT.CAL_DT"; + + // normal merge + { + Map<String, DimensionRangeInfo> m1 = new HashMap<>(); + m1.put(colId, new DimensionRangeInfo("2012-01-01", "2012-05-31")); + + Map<String, DimensionRangeInfo> m2 = new HashMap<>(); + m2.put(colId, new DimensionRangeInfo("2012-06-01", "2013-06-30")); + + DimensionRangeInfo r1 = DimensionRangeInfo.mergeRangeMap(model, m1, m2).get(colId); + Assert.assertEquals("2012-01-01", r1.getMin()); + Assert.assertEquals("2013-06-30", r1.getMax()); + } + + // missing column on one side + { + Map<String, DimensionRangeInfo> m1 = new HashMap<>(); + m1.put(colId, new DimensionRangeInfo("2012-01-01", "2012-05-31")); + + Map<String, DimensionRangeInfo> m2 = new HashMap<>(); + + Assert.assertTrue(DimensionRangeInfo.mergeRangeMap(model, m1, m2).isEmpty()); + } + + // null min/max value (happens on empty segment, or all-null columns) + { + Map<String, DimensionRangeInfo> m1 = new HashMap<>(); + m1.put(colId, new DimensionRangeInfo(null, null)); + + Map<String, DimensionRangeInfo> m2 = new HashMap<>(); + m2.put(colId, new DimensionRangeInfo("2012-06-01", "2013-06-30")); + + DimensionRangeInfo r1 = DimensionRangeInfo.mergeRangeMap(model, m1, m2).get(colId); + Assert.assertEquals("2012-06-01", r1.getMin()); + Assert.assertEquals("2013-06-30", r1.getMax()); + } + + } +} diff --git a/core-cube/src/test/java/org/apache/kylin/cube/common/SegmentPrunerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/common/SegmentPrunerTest.java new file mode 100644 index 0000000..7bf7a87 --- /dev/null +++ b/core-cube/src/test/java/org/apache/kylin/cube/common/SegmentPrunerTest.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.cube.common; + +import static org.apache.kylin.metadata.filter.TupleFilter.compare; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.common.util.SetAndUnsetSystemProp; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.metadata.filter.ConstantTupleFilter; +import org.apache.kylin.metadata.filter.TupleFilter; +import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum; +import org.apache.kylin.metadata.model.SegmentRange.TSRange; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.model.TblColRef; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Sets; + +public class SegmentPrunerTest extends LocalFileMetadataTestCase { + private CubeInstance cube; + + @Before + public void setUp() { + this.createTestMetadata(); + cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube("ssb_cube_with_dimention_range"); + } + + @After + public void after() { + this.cleanupTestMetadata(); + } + + @Test + public void testEmptySegment() { + CubeSegment seg = cube.getFirstSegment(); + TblColRef col = cube.getModel().findColumn("CUSTOMER.C_NATION"); + + // a normal hit + TupleFilter f = compare(col, FilterOperatorEnum.EQ, "CHINA"); + SegmentPruner segmentPruner = new SegmentPruner(f); + Assert.assertTrue(segmentPruner.check(seg)); + + // make the segment empty, it should be pruned + seg.setInputRecords(0); + Assert.assertFalse(segmentPruner.check(seg)); + } + + @Test + public void testDimensionRangeCheck() { + CubeSegment cubeSegment = cube.getSegments().getFirstSegment(); + + //integer + TblColRef qtyCol = cube.getModel().findColumn("V_LINEORDER.LO_QUANTITY"); + TupleFilter constFilter_LO_QUANTITY0 = new ConstantTupleFilter(Sets.newHashSet("8", "18", "28"));//between min and max value + TupleFilter constFilter_LO_QUANTITY1 = new ConstantTupleFilter("1");//min value + TupleFilter constFilter_LO_QUANTITY2 = new ConstantTupleFilter("50");//max value + TupleFilter constFilter_LO_QUANTITY3 = new ConstantTupleFilter("0");//lt min value + TupleFilter constFilter_LO_QUANTITY4 = new ConstantTupleFilter("200");//gt max value + TupleFilter constFilter_LO_QUANTITY5 = new ConstantTupleFilter(Sets.newHashSet("51", "52", "53"));//gt max values + + // is null + { + TupleFilter f = compare(qtyCol, FilterOperatorEnum.ISNULL); + SegmentPruner segmentPruner = new SegmentPruner(f); + Assert.assertTrue(segmentPruner.check(cubeSegment)); + } + + //lt min value + { + TupleFilter f = compare(qtyCol, FilterOperatorEnum.LT, constFilter_LO_QUANTITY1); + SegmentPruner segmentPruner = new SegmentPruner(f); + Assert.assertFalse(segmentPruner.check(cubeSegment)); + } + + //lte min value + { + TupleFilter f = compare(qtyCol, FilterOperatorEnum.LTE, constFilter_LO_QUANTITY1); + SegmentPruner segmentPruner = new SegmentPruner(f); + Assert.assertTrue(segmentPruner.check(cubeSegment)); + } + + //lt max value + { + TupleFilter f = compare(qtyCol, FilterOperatorEnum.LT, constFilter_LO_QUANTITY2); + SegmentPruner segmentPruner = new SegmentPruner(f); + Assert.assertTrue(segmentPruner.check(cubeSegment)); + } + + //gt max value + { + TupleFilter f = compare(qtyCol, FilterOperatorEnum.GT, constFilter_LO_QUANTITY2); + SegmentPruner segmentPruner = new SegmentPruner(f); + Assert.assertFalse(segmentPruner.check(cubeSegment)); + } + + //gte max value + { + TupleFilter f = compare(qtyCol, FilterOperatorEnum.GTE, constFilter_LO_QUANTITY2); + SegmentPruner segmentPruner = new SegmentPruner(f); + Assert.assertTrue(segmentPruner.check(cubeSegment)); + } + + //gt min value + { + TupleFilter f = compare(qtyCol, FilterOperatorEnum.GT, constFilter_LO_QUANTITY1); + SegmentPruner segmentPruner = new SegmentPruner(f); + Assert.assertTrue(segmentPruner.check(cubeSegment)); + } + + //in over-max values + { + TupleFilter f = compare(qtyCol, FilterOperatorEnum.IN, constFilter_LO_QUANTITY5); + SegmentPruner segmentPruner = new SegmentPruner(f); + Assert.assertFalse(segmentPruner.check(cubeSegment)); + } + + //in normal values + { + TupleFilter f = compare(qtyCol, FilterOperatorEnum.IN, constFilter_LO_QUANTITY0); + SegmentPruner segmentPruner = new SegmentPruner(f); + Assert.assertTrue(segmentPruner.check(cubeSegment)); + } + + //lte under-min value + { + TupleFilter f = compare(qtyCol, FilterOperatorEnum.LTE, constFilter_LO_QUANTITY3); + SegmentPruner segmentPruner = new SegmentPruner(f); + Assert.assertFalse(segmentPruner.check(cubeSegment)); + } + + //gte over-max value + { + TupleFilter f = compare(qtyCol, FilterOperatorEnum.GTE, constFilter_LO_QUANTITY4); + SegmentPruner segmentPruner = new SegmentPruner(f); + Assert.assertFalse(segmentPruner.check(cubeSegment)); + } + } + + @Test + public void testLegacyCubeSeg() { + // legacy cube segments does not have DimensionRangeInfo, but with TSRange can do some pruning + CubeInstance cube = CubeManager.getInstance(getTestConfig()) + .getCube("test_kylin_cube_without_slr_left_join_ready_2_segments"); + + TblColRef col = cube.getModel().findColumn("TEST_KYLIN_FACT.CAL_DT"); + CubeSegment seg = cube.getSegments(SegmentStatusEnum.READY).get(0); + TSRange tsRange = seg.getTSRange(); + long start = tsRange.start.v; + + try (SetAndUnsetSystemProp sns = new SetAndUnsetSystemProp("kylin.query.skip-empty-segments", "false")) { + { + TupleFilter f = compare(col, FilterOperatorEnum.LTE, start); + SegmentPruner segmentPruner = new SegmentPruner(f); + Assert.assertTrue(segmentPruner.check(seg)); + } + { + TupleFilter f = compare(col, FilterOperatorEnum.LT, start); + SegmentPruner segmentPruner = new SegmentPruner(f); + Assert.assertFalse(segmentPruner.check(seg)); + } + } + } +} diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java index 16ea8ee..43b204a 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java @@ -32,6 +32,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; /** * @@ -92,7 +93,7 @@ public abstract class TupleFilter { public static CompareTupleFilter compare(TblColRef col, FilterOperatorEnum op, Object val) { CompareTupleFilter r = new CompareTupleFilter(op); r.addChild(new ColumnTupleFilter(col)); - r.addChild(new ConstantTupleFilter(val)); + r.addChild(val instanceof ConstantTupleFilter ? (ConstantTupleFilter) val : new ConstantTupleFilter(val)); return r; } @@ -309,6 +310,31 @@ public abstract class TupleFilter { } } + //find must true compareTupleFilter + public Set<CompareTupleFilter> findMustTrueCompareFilters() { + Set<CompareTupleFilter> result = Sets.newHashSet(); + findMustTrueCompareFilters(this, result); + return result; + } + + private void findMustTrueCompareFilters(TupleFilter filter, Set<CompareTupleFilter> result) { + if (filter instanceof CompareTupleFilter) { + if (((CompareTupleFilter) filter).getColumn() != null) { + result.add((CompareTupleFilter) filter); + } + return; + } + + if (filter instanceof LogicalTupleFilter) { + if (filter.getOperator() == FilterOperatorEnum.AND) { + for (TupleFilter child : filter.getChildren()) { + findMustTrueCompareFilters(child, result); + } + } + return; + } + } + public abstract boolean isEvaluable(); public abstract boolean evaluate(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs); diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/DataTypeOrderTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/DataTypeOrderTest.java new file mode 100644 index 0000000..7666ffe --- /dev/null +++ b/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/DataTypeOrderTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metadata.datatype; + +import com.google.common.collect.Sets; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Set; + +public class DataTypeOrderTest { + @Test + public void testDataTypeOrder() { + DataType intType = DataType.getType("integer"); + DataTypeOrder dataTypeOrder = intType.getOrder(); + Set<String> integers = Sets.newHashSet("100000", "2", "1000", "100", "77", "10", "9", "2000000", "-10000", "0"); + Assert.assertEquals("2000000", dataTypeOrder.max(integers)); + Assert.assertEquals("-10000", dataTypeOrder.min(integers)); + + DataType doubleType = DataType.getType("double"); + dataTypeOrder = doubleType.getOrder(); + Set<String> doubels = Sets.newHashSet("1.1", "-299.5", "100000", "1.000", "4.000000001", "0.00", "-1000000.231231", "8000000", + "10", "10.00"); + Assert.assertEquals("8000000", dataTypeOrder.max(doubels)); + Assert.assertEquals("-1000000.231231", dataTypeOrder.min(doubels)); + + DataType datetimeType = DataType.getType("date"); + dataTypeOrder = datetimeType.getOrder(); + Set<String> datetimes = Sets.newHashSet("2010-01-02", "2888-08-09", "2018-05-26", "1527512082000", "2010-02-03 23:59:59", + "2000-12-12 12:00:00", "1970-01-19 00:18:32", "1998-12-02", "2018-05-28 10:00:00.255", "1995-09-20 20:00:00.220"); + Assert.assertEquals("2888-08-09", dataTypeOrder.max(datetimes)); + Assert.assertEquals("1970-01-19 00:18:32", dataTypeOrder.min(datetimes)); + + DataType stringType = new DataType("varchar", 256, 10); + dataTypeOrder = stringType.getOrder(); + Set<String> strings = Sets.newHashSet(null, "", "中国", "China No.1", "神兽麒麟", "Rocket", "Apache Kylin", "google", "NULL", + "empty"); + Assert.assertEquals("神兽麒麟", dataTypeOrder.max(strings)); + Assert.assertEquals("", dataTypeOrder.min(strings)); + } +} diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/filter/TupleFilterTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/filter/TupleFilterTest.java index b1e8f62..95609eb 100644 --- a/core-metadata/src/test/java/org/apache/kylin/metadata/filter/TupleFilterTest.java +++ b/core-metadata/src/test/java/org/apache/kylin/metadata/filter/TupleFilterTest.java @@ -116,4 +116,27 @@ public class TupleFilterTest { r.put(col2, v2); return r; } + + @Test + public void testMustTrueTupleFilter() { + TupleFilter andFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND); + TupleFilter andFilter2 = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND); + TupleFilter orFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.OR); + andFilter.addChild(andFilter2); + andFilter.addChild(orFilter); + + Set<CompareTupleFilter> trueTupleFilters = andFilter.findMustTrueCompareFilters(); + Assert.assertTrue(trueTupleFilters.isEmpty()); + + TupleFilter compFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT); + compFilter.addChild(new ColumnTupleFilter(TblColRef.newInnerColumn("test1", TblColRef.InnerDataTypeEnum.LITERAL))); + TupleFilter compFilter2 = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT); + compFilter2.addChild(new ColumnTupleFilter(TblColRef.newInnerColumn("test2", TblColRef.InnerDataTypeEnum.LITERAL))); + andFilter2.addChild(compFilter); + orFilter.addChild(compFilter2); + Assert.assertEquals(Sets.newHashSet(compFilter), andFilter.findMustTrueCompareFilters()); + + Assert.assertEquals(Sets.newHashSet(compFilter2), compFilter2.findMustTrueCompareFilters()); + } + } diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java index 5f86a45..229ef01 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java @@ -31,7 +31,6 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.ImmutableBitSet; -import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.common.FuzzyValueCombination; import org.apache.kylin.cube.cuboid.Cuboid; @@ -39,7 +38,6 @@ import org.apache.kylin.cube.gridtable.CubeGridTable; import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping; import org.apache.kylin.cube.gridtable.RecordComparators; import org.apache.kylin.cube.gridtable.ScanRangePlannerBase; -import org.apache.kylin.cube.gridtable.SegmentGTStartAndEnd; import org.apache.kylin.cube.kv.CubeDimEncMap; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.gridtable.GTInfo; @@ -137,15 +135,9 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase { this.gtDynColumns = new ImmutableBitSet(tmpGtDynCols); this.gtRtAggrMetrics = mapping.makeGridTableColumns(tmpRtAggrMetrics); - if (cubeSegment.getModel().getPartitionDesc().isPartitioned()) { - int index = mapping.getIndexOf(cubeSegment.getModel().getPartitionDesc().getPartitionDateColumnRef()); - if (index >= 0) { - SegmentGTStartAndEnd segmentGTStartAndEnd = new SegmentGTStartAndEnd(cubeSegment, gtInfo); - this.gtStartAndEnd = segmentGTStartAndEnd.getSegmentStartAndEnd(index); - this.isPartitionColUsingDatetimeEncoding = segmentGTStartAndEnd.isUsingDatetimeEncoding(index); - this.gtPartitionCol = gtInfo.colRef(index); - } - } + this.gtAggrGroups = mapping.makeGridTableColumns(replaceDerivedColumns(groupByPushDown, cubeSegment.getCubeDesc())); + this.gtAggrMetrics = mapping.makeGridTableColumns(metrics); + this.gtAggrFuncs = mapping.makeAggrFuncs(metrics); } protected StorageContext context; @@ -153,7 +145,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase { /** * Construct GTScanRangePlanner with incomplete information. For UT only. */ - public CubeScanRangePlanner(GTInfo info, Pair<ByteArray, ByteArray> gtStartAndEnd, TblColRef gtPartitionCol, TupleFilter gtFilter) { + public CubeScanRangePlanner(GTInfo info, TblColRef gtPartitionCol, TupleFilter gtFilter) { this.maxScanRanges = KylinConfig.getInstanceFromEnv().getQueryStorageVisitScanRangeMax(); this.maxFuzzyKeysPerSplit = KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeyMax(); @@ -170,8 +162,6 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase { this.rangeStartEndComparator = RecordComparators.getRangeStartEndComparator(comp); this.gtFilter = gtFilter; - this.gtStartAndEnd = gtStartAndEnd; - this.gtPartitionCol = gtPartitionCol; } public GTScanRequest planScanRequest() { @@ -237,18 +227,6 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase { Map<Integer, Set<ByteArray>> fuzzyValues = Maps.newHashMap(); for (ColumnRange range : andDimRanges) { - if (gtPartitionCol != null && range.column.equals(gtPartitionCol)) { - int beginCompare = rangeStartEndComparator.comparator.compare(range.begin, gtStartAndEnd.getSecond()); - int endCompare = rangeStartEndComparator.comparator.compare(gtStartAndEnd.getFirst(), range.end); - - if ((isPartitionColUsingDatetimeEncoding && endCompare <= 0 && beginCompare < 0) || (!isPartitionColUsingDatetimeEncoding && endCompare <= 0 && beginCompare <= 0)) { - //segment range is [Closed,Open), but segmentStartAndEnd.getSecond() might be rounded when using dict encoding, so use <= when has equals in condition. - } else { - logger.debug("Pre-check partition col filter failed, partitionColRef {}, segment start {}, segment end {}, range begin {}, range end {}", // - gtPartitionCol, makeReadable(gtStartAndEnd.getFirst()), makeReadable(gtStartAndEnd.getSecond()), makeReadable(range.begin), makeReadable(range.end)); - return null; - } - } int col = range.column.getColumnDesc().getZeroBasedIndex(); if (!gtInfo.getPrimaryKey().get(col)) diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java index 352a868..269833f 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@ -33,6 +33,7 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.RawQueryLastHacker; +import org.apache.kylin.cube.common.SegmentPruner; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping; import org.apache.kylin.cube.gridtable.CuboidToGridTableMappingExt; @@ -87,14 +88,10 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { GTCubeStorageQueryRequest request = getStorageQueryRequest(context, sqlDigest, returnTupleInfo); List<CubeSegmentScanner> scanners = Lists.newArrayList(); - for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) { + SegmentPruner segPruner = new SegmentPruner(sqlDigest.filter); + for (CubeSegment cubeSeg : segPruner.listSegmentsForQuery(cubeInstance)) { CubeSegmentScanner scanner; - if (cubeDesc.getConfig().isSkippingEmptySegments() && cubeSeg.getInputRecords() == 0) { - logger.info("Skip cube segment {} because its input record is 0", cubeSeg); - continue; - } - scanner = new CubeSegmentScanner(cubeSeg, request.getCuboid(), request.getDimensions(), // request.getGroups(), request.getDynGroups(), request.getDynGroupExprs(), // request.getMetrics(), request.getDynFuncs(), // diff --git a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java index 073c12c..08bcb65 100644 --- a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java +++ b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java @@ -32,7 +32,6 @@ import org.apache.kylin.common.util.BytesSerializer; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.LocalFileMetadataTestCase; -import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.gridtable.CubeCodeSystem; import org.apache.kylin.dict.NumberDictionaryForestBuilder; import org.apache.kylin.dict.StringBytesConverter; @@ -121,13 +120,11 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { ByteArray segmentStart = enc(info, 0, "2015-01-14"); ByteArray segmentStartX = enc(info, 0, "2015-01-14 00:00:00");//when partition col is dict encoded, time format will be free - ByteArray segmentEnd = enc(info, 0, "2015-01-15"); assertEquals(segmentStart, segmentStartX); { LogicalTupleFilter filter = and(timeComp0, ageComp1); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), - info.colRef(0), filter); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter); List<GTScanRange> r = planner.planScanRanges(); assertEquals(1, r.size());//scan range are [close,close] assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString()); @@ -136,64 +133,57 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { } { LogicalTupleFilter filter = and(timeComp2, ageComp1); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), - info.colRef(0), filter); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter); List<GTScanRange> r = planner.planScanRanges(); - assertEquals(0, r.size()); + assertEquals(1, r.size()); } { LogicalTupleFilter filter = and(timeComp4, ageComp1); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), - info.colRef(0), filter); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter); List<GTScanRange> r = planner.planScanRanges(); - assertEquals(0, r.size()); + assertEquals(1, r.size()); } { LogicalTupleFilter filter = and(timeComp5, ageComp1); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), - info.colRef(0), filter); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter); List<GTScanRange> r = planner.planScanRanges(); - assertEquals(0, r.size()); + assertEquals(1, r.size()); } { LogicalTupleFilter filter = or(and(timeComp2, ageComp1), and(timeComp1, ageComp1), and(timeComp6, ageComp1)); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), - info.colRef(0), filter); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter); List<GTScanRange> r = planner.planScanRanges(); - assertEquals(1, r.size()); - assertEquals("[1421193600000, 10]-[null, 10]", r.get(0).toString()); + assertEquals(2, r.size()); + assertEquals("[1421193600000, 10]-[null, 10]", r.get(1).toString()); assertEquals("[[null, 10, null, null, null], [1421193600000, 10, null, null, null]]", - r.get(0).fuzzyKeys.toString()); + r.get(1).fuzzyKeys.toString()); } { LogicalTupleFilter filter = or(and(timeComp3, ageComp3), and(timeComp7, ageComp1)); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter); List<GTScanRange> r = planner.planScanRanges(); assertEquals("[[0, 10]-[1421280000000, 30]]", r.toString()); } { LogicalTupleFilter filter = or(timeComp2, timeComp1, timeComp6); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), - info.colRef(0), filter); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter); List<GTScanRange> r = planner.planScanRanges(); - assertEquals(1, r.size()); - assertEquals("[1421193600000, null]-[null, null]", r.get(0).toString()); - assertEquals(0, r.get(0).fuzzyKeys.size()); + assertEquals(2, r.size()); + assertEquals("[1421193600000, null]-[null, null]", r.get(1).toString()); + assertEquals(0, r.get(1).fuzzyKeys.size()); } { //skip FALSE filter LogicalTupleFilter filter = and(ageComp1, ConstantTupleFilter.FALSE); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), - info.colRef(0), filter); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter); List<GTScanRange> r = planner.planScanRanges(); assertEquals(0, r.size()); } { //TRUE or FALSE filter LogicalTupleFilter filter = or(ConstantTupleFilter.TRUE, ConstantTupleFilter.FALSE); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), - info.colRef(0), filter); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter); List<GTScanRange> r = planner.planScanRanges(); assertEquals(1, r.size()); assertEquals("[null, null]-[null, null]", r.get(0).toString()); @@ -201,8 +191,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { { //TRUE or other filter LogicalTupleFilter filter = or(ageComp1, ConstantTupleFilter.TRUE); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd), - info.colRef(0), filter); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter); List<GTScanRange> r = planner.planScanRanges(); assertEquals(1, r.size()); assertEquals("[null, null]-[null, null]", r.get(0).toString()); @@ -211,12 +200,9 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { @Test public void verifySegmentSkipping2() { - ByteArray segmentEnd = enc(info, 0, "2015-01-15"); - { LogicalTupleFilter filter = and(timeComp0, ageComp1); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd), - info.colRef(0), filter); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter); List<GTScanRange> r = planner.planScanRanges(); assertEquals(1, r.size());//scan range are [close,close] assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString()); @@ -226,10 +212,9 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { { LogicalTupleFilter filter = and(timeComp5, ageComp1); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd), - info.colRef(0), filter); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter); List<GTScanRange> r = planner.planScanRanges(); - assertEquals(0, r.size());//scan range are [close,close] + assertEquals(1, r.size());//scan range are [close,close] } } @@ -239,7 +224,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { // flatten or-and & hbase fuzzy value { LogicalTupleFilter filter = and(timeComp1, or(ageComp1, ageComp2)); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter); List<GTScanRange> r = planner.planScanRanges(); assertEquals(1, r.size()); assertEquals("[1421193600000, 10]-[null, 20]", r.get(0).toString()); @@ -249,7 +234,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { // pre-evaluate ever false { LogicalTupleFilter filter = and(timeComp1, timeComp2); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter); List<GTScanRange> r = planner.planScanRanges(); assertEquals(0, r.size()); } @@ -257,7 +242,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { // pre-evaluate ever true { LogicalTupleFilter filter = or(timeComp1, ageComp4); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter); List<GTScanRange> r = planner.planScanRanges(); assertEquals("[[null, null]-[null, null]]", r.toString()); } @@ -265,7 +250,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { // merge overlap range { LogicalTupleFilter filter = or(timeComp1, timeComp3); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter); List<GTScanRange> r = planner.planScanRanges(); assertEquals("[[null, null]-[null, null]]", r.toString()); } @@ -274,7 +259,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase { { LogicalTupleFilter filter = or(and(timeComp4, ageComp1), and(timeComp4, ageComp2), and(timeComp4, ageComp3)); - CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter); + CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter); List<GTScanRange> r = planner.planScanRanges(); assertEquals(3, r.size()); assertEquals("[1421280000000, 10]-[1421280000000, 10]", r.get(0).toString()); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java index f3bdabd..4305a25 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java @@ -104,7 +104,6 @@ public class CalculateStatsFromBaseCuboidJob extends AbstractHadoopJob { private void setupReducer(Path output, CubeSegment cubeSeg) throws IOException { int hllShardBase = MapReduceUtil.getCuboidHLLCounterReducerNum(cubeSeg.getCubeInstance()); - job.getConfiguration().setInt(BatchConstants.CFG_HLL_REDUCER_NUM, hllShardBase); job.setReducerClass(CalculateStatsFromBaseCuboidReducer.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java index 9bede82..4ea04d5 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java @@ -55,8 +55,7 @@ public class FactDistinctColumnPartitioner extends Partitioner<SelfDefineSortabl String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME); CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); - reducerMapping = new FactDistinctColumnsReducerMapping(cube, - conf.getInt(BatchConstants.CFG_HLL_REDUCER_NUM, 1)); + reducerMapping = new FactDistinctColumnsReducerMapping(cube); } @Override @@ -65,8 +64,6 @@ public class FactDistinctColumnPartitioner extends Partitioner<SelfDefineSortabl if (key.getBytes()[0] == FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER) { Long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG); return reducerMapping.getReducerIdForCuboidRowCount(cuboidId); - } else if (key.getBytes()[0] == FactDistinctColumnsReducerMapping.MARK_FOR_PARTITION_COL) { - return reducerMapping.getReducerIdForDatePartitionColumn(); } else { return BytesUtil.readUnsigned(key.getBytes(), 0, 1); } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java index f96944a..8f5d176 100755 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java @@ -131,6 +131,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { throws IOException { FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(cubeSeg.getCubeInstance()); int numberOfReducers = reducerMapping.getTotalReducerNum(); + logger.info("{} has reducers {}.", this.getClass().getName(), numberOfReducers); if (numberOfReducers > 250) { throw new IllegalArgumentException( "The max reducer number for FactDistinctColumnsJob is 250, but now it is " @@ -141,7 +142,6 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { job.setReducerClass(FactDistinctColumnsReducer.class); job.setPartitionerClass(FactDistinctColumnPartitioner.class); job.setNumReduceTasks(numberOfReducers); - job.getConfiguration().setInt(BatchConstants.CFG_HLL_REDUCER_NUM, reducerMapping.getCuboidRowCounterReducerNum()); // make each reducer output to respective dir MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class, NullWritable.class, Text.class); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java index 569b810..fc9dc65 100755 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java @@ -37,7 +37,6 @@ import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.measure.hllc.HLLCounter; import org.apache.kylin.measure.hllc.RegisterType; import org.apache.kylin.metadata.datatype.DataType; -import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,9 +67,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB private static final Text EMPTY_TEXT = new Text(); - private int partitionColumnIndex = -1; - private boolean needFetchPartitionCol = true; - private SelfDefineSortableKey sortableKey = new SelfDefineSortableKey(); @Override @@ -96,18 +92,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision(), RegisterType.DENSE); } - TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef(); - if (partitionColRef != null) { - partitionColumnIndex = intermediateTableDesc.getColumnIndex(partitionColRef); - } - - // check whether need fetch the partition col values - if (partitionColumnIndex < 0) { - // if partition col not on cube, no need - needFetchPartitionCol = false; - } else { - needFetchPartitionCol = true; - } //for KYLIN-2518 backward compatibility boolean isUsePutRowKeyToHllNewAlgorithm; if (KylinVersion.isBefore200(cubeDesc.getVersion())) { @@ -172,12 +156,12 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB for (String[] row : rowCollection) { context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row)); - for (int i = 0; i < dictCols.size(); i++) { - String fieldValue = row[dictionaryColumnIndex[i]]; + for (int i = 0; i < allDimDictCols.size(); i++) { + String fieldValue = row[columnIndex[i]]; if (fieldValue == null) continue; - int reducerIndex = reducerMapping.getReducerIdForDictCol(i, fieldValue); + int reducerIndex = reducerMapping.getReducerIdForCol(i, fieldValue); tmpbuf.clear(); byte[] valueBytes = Bytes.toBytes(fieldValue); @@ -188,15 +172,14 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB tmpbuf.put(Bytes.toBytes(reducerIndex)[3]); tmpbuf.put(valueBytes); outputKey.set(tmpbuf.array(), 0, tmpbuf.position()); - DataType type = dictCols.get(i).getType(); + DataType type = allDimDictCols.get(i).getType(); sortableKey.init(outputKey, type); - //judge type context.write(sortableKey, EMPTY_TEXT); // log a few rows for troubleshooting if (rowCount < 10) { logger.info( - "Sample output: " + dictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex); + "Sample output: " + allDimDictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex); } } @@ -204,22 +187,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB putRowKeyToHLL(row); } - if (needFetchPartitionCol == true) { - String fieldValue = row[partitionColumnIndex]; - if (fieldValue != null) { - tmpbuf.clear(); - byte[] valueBytes = Bytes.toBytes(fieldValue); - int size = valueBytes.length + 1; - if (size >= tmpbuf.capacity()) { - tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size)); - } - tmpbuf.put((byte) FactDistinctColumnsReducerMapping.MARK_FOR_PARTITION_COL); - tmpbuf.put(valueBytes); - outputKey.set(tmpbuf.array(), 0, tmpbuf.position()); - sortableKey.init(outputKey, (byte) 0); - context.write(sortableKey, EMPTY_TEXT); - } - } rowCount++; } } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java index c66042b..ceddeb5 100755 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java @@ -39,8 +39,6 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.metadata.model.TblColRef; -import com.google.common.collect.Lists; - /** */ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, SelfDefineSortableKey, Text> { @@ -50,8 +48,8 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends Kyli protected CubeSegment cubeSeg; protected CubeDesc cubeDesc; protected long baseCuboidId; - protected List<TblColRef> dictCols; protected IMRTableInputFormat flatTableInputFormat; + protected List<TblColRef> allDimDictCols; protected Text outputKey = new Text(); //protected SelfDefineSortableKey sortableKey = new SelfDefineSortableKey(); @@ -59,7 +57,7 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends Kyli protected int errorRecordCounter = 0; protected CubeJoinedFlatTableEnrich intermediateTableDesc; - protected int[] dictionaryColumnIndex; + protected int[] columnIndex; protected FactDistinctColumnsReducerMapping reducerMapping; @@ -74,20 +72,18 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends Kyli cubeSeg = cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID)); cubeDesc = cube.getDescriptor(); baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); - dictCols = Lists.newArrayList(cubeDesc.getAllColumnsNeedDictionaryBuilt()); + reducerMapping = new FactDistinctColumnsReducerMapping(cube); + allDimDictCols = reducerMapping.getAllDimDictCols(); flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat(); intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc); - dictionaryColumnIndex = new int[dictCols.size()]; - for (int i = 0; i < dictCols.size(); i++) { - TblColRef colRef = dictCols.get(i); + columnIndex = new int[allDimDictCols.size()]; + for (int i = 0; i < allDimDictCols.size(); i++) { + TblColRef colRef = allDimDictCols.get(i); int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef); - dictionaryColumnIndex[i] = columnIndexOnFlatTbl; + columnIndex[i] = columnIndexOnFlatTbl; } - - reducerMapping = new FactDistinctColumnsReducerMapping(cube, - conf.getInt(BatchConstants.CFG_HLL_REDUCER_NUM, 1)); } protected void handleErrorRecord(String[] record, Exception ex) throws IOException { diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index 801771a..61ba247 100755 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -35,7 +35,6 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -71,17 +70,16 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK private boolean isStatistics = false; private KylinConfig cubeConfig; private int taskId; - private boolean isPartitionCol = false; private int rowCount = 0; private FactDistinctColumnsReducerMapping reducerMapping; //local build dict private boolean buildDictInReducer; private IDictionaryBuilder builder; - private long timeMaxValue = Long.MIN_VALUE; - private long timeMinValue = Long.MAX_VALUE; + private String maxValue = null; + private String minValue = null; public static final String DICT_FILE_POSTFIX = ".rldict"; - public static final String PARTITION_COL_INFO_FILE_POSTFIX = ".pci"; + public static final String DIMENSION_COL_INFO_FILE_POSTFIX = ".dci"; private MultipleOutputs mos; @@ -97,11 +95,9 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK cubeConfig = cube.getConfig(); cubeDesc = cube.getDescriptor(); - int numberOfTasks = context.getNumReduceTasks(); taskId = context.getTaskAttemptID().getTaskID().getId(); - reducerMapping = new FactDistinctColumnsReducerMapping(cube, - conf.getInt(BatchConstants.CFG_HLL_REDUCER_NUM, 1)); + reducerMapping = new FactDistinctColumnsReducerMapping(cube); logger.info("reducer no " + taskId + ", role play " + reducerMapping.getRolePlayOfReducer(taskId)); @@ -114,18 +110,9 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK samplingPercentage = Integer .parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); logger.info("Reducer " + taskId + " handling stats"); - } else if (reducerMapping.isPartitionColReducer(taskId)) { - // partition col - isPartitionCol = true; - col = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef(); - if (col == null) { - logger.info("No partition col. This reducer will do nothing"); - } else { - logger.info("Reducer " + taskId + " handling partition col " + col.getIdentity()); - } } else { // normal col - col = reducerMapping.getDictColForReducer(taskId); + col = reducerMapping.getColForReducer(taskId); Preconditions.checkNotNull(col); // local build dict @@ -133,7 +120,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK if (cubeDesc.getDictionaryBuilderClass(col) != null) { // only works with default dictionary builder buildDictInReducer = false; } - if (reducerMapping.getReducerNumForDictCol(col) > 1) { + if (reducerMapping.getReducerNumForDimCol(col) > 1) { buildDictInReducer = false; // only works if this is the only reducer of a dictionary column } if (buildDictInReducer) { @@ -167,24 +154,29 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK cuboidHLLMap.put(cuboidId, hll); } } - } else if (isPartitionCol) { - // partition col + } else { String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1); logAFewRows(value); - long time = DateFormat.stringToMillis(value); - timeMinValue = Math.min(timeMinValue, time); - timeMaxValue = Math.max(timeMaxValue, time); - } else { - // normal col - if (buildDictInReducer) { - String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1); - logAFewRows(value); - builder.addValue(value); - } else { - byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1); - // output written to baseDir/colName/-r-00000 (etc) - String fileName = col.getIdentity() + "/"; - mos.write(BatchConstants.CFG_OUTPUT_COLUMN, NullWritable.get(), new Text(keyBytes), fileName); + // if dimension col, compute max/min value + if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) { + if (minValue == null || col.getType().compare(minValue, value) > 0) { + minValue = value; + } + if (maxValue == null || col.getType().compare(maxValue, value) < 0) { + maxValue = value; + } + } + + //if dict column + if (cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) { + if (buildDictInReducer) { + builder.addValue(value); + } else { + byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1); + // output written to baseDir/colName/-r-00000 (etc) + String fileName = col.getIdentity() + "/"; + mos.write(BatchConstants.CFG_OUTPUT_COLUMN, NullWritable.get(), new Text(keyBytes), fileName); + } } } @@ -207,11 +199,12 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK logMapperAndCuboidStatistics(allCuboids); // for human check outputStatistics(allCuboids); - } else if (isPartitionCol) { - // partition col - outputPartitionInfo(); } else { - // normal col + //dimension col + if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) { + outputDimRangeInfo(); + } + // dic col if (buildDictInReducer) { Dictionary<String> dict = builder.build(); outputDict(col, dict); @@ -221,14 +214,17 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK mos.close(); } - private void outputPartitionInfo() throws IOException, InterruptedException { - if (col != null) { - // output written to baseDir/colName/colName.pci-r-00000 (etc) - String partitionFileName = col.getIdentity() + "/" + col.getName() + PARTITION_COL_INFO_FILE_POSTFIX; - - mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMinValue), partitionFileName); - mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMaxValue), partitionFileName); - logger.info("write partition info for col : " + col.getName() + " minValue:" + timeMinValue + " maxValue:" + timeMaxValue); + private void outputDimRangeInfo() throws IOException, InterruptedException { + if (col != null && minValue != null) { + // output written to baseDir/colName/colName.dci-r-00000 (etc) + String dimRangeFileName = col.getIdentity() + "/" + col.getName() + DIMENSION_COL_INFO_FILE_POSTFIX; + + mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new Text(minValue.getBytes()), + dimRangeFileName); + mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new Text(maxValue.getBytes()), + dimRangeFileName); + logger.info("write dimension range info for col : " + col.getName() + " minValue:" + minValue + " maxValue:" + + maxValue); } } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java index 51594c3..b60e4ce 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java @@ -18,73 +18,74 @@ package org.apache.kylin.engine.mr.steps; -import java.util.ArrayList; import java.util.List; +import java.util.Set; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.common.MapReduceUtil; import org.apache.kylin.metadata.model.TblColRef; +import com.google.common.collect.Lists; + /** * Reducers play different roles based on reducer-id: - * - (start from 0) one reducer for each dictionary column, UHC may have more than one reducer - * - one reducer to get min/max of date partition column + * - (start from 0) one reducer for each dimension column, dictionary column, UHC may have more than one reducer * - (at the end) one or more reducers to collect row counts for cuboids using HLL */ public class FactDistinctColumnsReducerMapping { - public static final int MARK_FOR_PARTITION_COL = -2; public static final int MARK_FOR_HLL_COUNTER = -1; - final private int nDictValueCollectors; - final private int datePartitionReducerId; final private int nCuboidRowCounters; + final private int nDimReducers; final private int nTotalReducers; - final private List<TblColRef> allDictCols; - final private int[] dictColIdToReducerBeginId; + final private List<TblColRef> allDimDictCols = Lists.newArrayList(); + final private int[] colIdToReducerBeginId; final private int[] reducerRolePlay; // >=0 for dict col id, <0 for partition col and hll counter (using markers) public FactDistinctColumnsReducerMapping(CubeInstance cube) { this(cube, 0); } - public FactDistinctColumnsReducerMapping(CubeInstance cube, int cuboidRowCounterReducerNum) { + private FactDistinctColumnsReducerMapping(CubeInstance cube, int cuboidRowCounterReducerNum) { CubeDesc desc = cube.getDescriptor(); + Set<TblColRef> allCols = cube.getAllColumns(); + Set<TblColRef> dictCols = desc.getAllColumnsNeedDictionaryBuilt(); + List<TblColRef> dimCols = desc.listDimensionColumnsExcludingDerived(true); + for (TblColRef colRef : allCols) { + if (dictCols.contains(colRef)) { + allDimDictCols.add(colRef); + } else if (dimCols.indexOf(colRef) >= 0){ + allDimDictCols.add(colRef); + } + } - allDictCols = new ArrayList(desc.getAllColumnsNeedDictionaryBuilt()); - - dictColIdToReducerBeginId = new int[allDictCols.size() + 1]; + colIdToReducerBeginId = new int[allDimDictCols.size() + 1]; int uhcReducerCount = cube.getConfig().getUHCReducerCount(); List<TblColRef> uhcList = desc.getAllUHCColumns(); int counter = 0; - for (int i = 0; i < allDictCols.size(); i++) { - dictColIdToReducerBeginId[i] = counter; - boolean isUHC = uhcList.contains(allDictCols.get(i)); + for (int i = 0; i < allDimDictCols.size(); i++) { + colIdToReducerBeginId[i] = counter; + boolean isUHC = uhcList.contains(allDimDictCols.get(i)); counter += (isUHC) ? uhcReducerCount : 1; } - - dictColIdToReducerBeginId[allDictCols.size()] = counter; - nDictValueCollectors = counter; - datePartitionReducerId = counter; + colIdToReducerBeginId[allDimDictCols.size()] = counter; + nDimReducers = counter; nCuboidRowCounters = cuboidRowCounterReducerNum == 0 ? // MapReduceUtil.getCuboidHLLCounterReducerNum(cube) : cuboidRowCounterReducerNum; - nTotalReducers = nDictValueCollectors + 1 + nCuboidRowCounters; + nTotalReducers = nDimReducers + nCuboidRowCounters; reducerRolePlay = new int[nTotalReducers]; for (int i = 0, dictId = 0; i < nTotalReducers; i++) { - if (i > datePartitionReducerId) { + if (i >= nDimReducers) { // cuboid HLL counter reducer reducerRolePlay[i] = MARK_FOR_HLL_COUNTER; - } else if (i == datePartitionReducerId) { - // date partition min/max reducer - reducerRolePlay[i] = MARK_FOR_PARTITION_COL; } else { - // dict value collector reducer - if (i == dictColIdToReducerBeginId[dictId + 1]) + if (i == colIdToReducerBeginId[dictId + 1]) dictId++; reducerRolePlay[i] = dictId; @@ -92,8 +93,8 @@ public class FactDistinctColumnsReducerMapping { } } - public List<TblColRef> getAllDictCols() { - return allDictCols; + public List<TblColRef> getAllDimDictCols() { + return allDimDictCols; } public int getTotalReducerNum() { @@ -104,9 +105,9 @@ public class FactDistinctColumnsReducerMapping { return nCuboidRowCounters; } - public int getReducerIdForDictCol(int dictColId, Object fieldValue) { - int begin = dictColIdToReducerBeginId[dictColId]; - int span = dictColIdToReducerBeginId[dictColId + 1] - begin; + public int getReducerIdForCol(int colId, Object fieldValue) { + int begin = colIdToReducerBeginId[colId]; + int span = colIdToReducerBeginId[colId + 1] - begin; if (span == 1) return begin; @@ -120,37 +121,29 @@ public class FactDistinctColumnsReducerMapping { } public int getRolePlayOfReducer(int reducerId) { - return reducerRolePlay[reducerId]; + return reducerRolePlay[reducerId % nTotalReducers]; } public boolean isCuboidRowCounterReducer(int reducerId) { return getRolePlayOfReducer(reducerId) == MARK_FOR_HLL_COUNTER; } - - public boolean isPartitionColReducer(int reducerId) { - return getRolePlayOfReducer(reducerId) == MARK_FOR_PARTITION_COL; - } - public TblColRef getDictColForReducer(int reducerId) { - int role = getRolePlayOfReducer(reducerId); + public TblColRef getColForReducer(int reducerId) { + int role = getRolePlayOfReducer(reducerId % nTotalReducers); if (role < 0) throw new IllegalStateException(); - return allDictCols.get(role); - } - - public int getReducerNumForDictCol(TblColRef col) { - int dictColId = allDictCols.indexOf(col); - return dictColIdToReducerBeginId[dictColId + 1] - dictColIdToReducerBeginId[dictColId]; + return allDimDictCols.get(role); } - public int getReducerIdForDatePartitionColumn() { - return datePartitionReducerId; + public int getReducerNumForDimCol(TblColRef col) { + int dictColId = allDimDictCols.indexOf(col); + return colIdToReducerBeginId[dictColId + 1] - colIdToReducerBeginId[dictColId]; } public int getReducerIdForCuboidRowCount(long cuboidId) { int rowCounterId = (int) (Math.abs(cuboidId) % nCuboidRowCounters); - return datePartitionReducerId + 1 + rowCounterId; + return nDimReducers + rowCounterId; } } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java index f749c80..fdb19db 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java @@ -23,15 +23,18 @@ import java.io.IOException; import java.io.InputStreamReader; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.DimensionRangeInfo; import org.apache.kylin.cube.model.SnapshotTableDesc; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.LookupMaterializeContext; @@ -40,11 +43,14 @@ import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.metadata.datatype.DataTypeOrder; import org.apache.kylin.metadata.model.SegmentRange.TSRange; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Sets; + /** */ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { @@ -76,9 +82,7 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { try { saveExtSnapshotIfNeeded(cubeManager, cube, segment); - if (segment.isOffsetCube()) { - updateTimeRange(segment); - } + updateSegment(segment); cubeManager.promoteNewlyBuiltSegments(cube, segment); return new ExecuteResult(); @@ -115,40 +119,51 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { } } - private void updateTimeRange(CubeSegment segment) throws IOException { + private void updateSegment(CubeSegment segment) throws IOException { final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef(); - if (partitionCol == null) { - return; - } - final String factColumnsInputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH); - Path colDir = new Path(factColumnsInputPath, partitionCol.getIdentity()); - FileSystem fs = HadoopUtil.getWorkingFileSystem(); - Path outputFile = HadoopUtil.getFilterOnlyPath(fs, colDir, - partitionCol.getName() + FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX); - if (outputFile == null) { - throw new IOException("fail to find the partition file in base dir: " + colDir); - } - - FSDataInputStream is = null; - BufferedReader bufferedReader = null; - InputStreamReader isr = null; - long minValue, maxValue; - try { - is = fs.open(outputFile); - isr = new InputStreamReader(is); - bufferedReader = new BufferedReader(isr); - minValue = Long.parseLong(bufferedReader.readLine()); - maxValue = Long.parseLong(bufferedReader.readLine()); - } finally { - IOUtils.closeQuietly(is); - IOUtils.closeQuietly(isr); - IOUtils.closeQuietly(bufferedReader); - } + for (TblColRef dimColRef : segment.getCubeDesc().listDimensionColumnsExcludingDerived(true)) { + final String factColumnsInputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH); + Path colDir = new Path(factColumnsInputPath, dimColRef.getIdentity()); + FileSystem fs = HadoopUtil.getWorkingFileSystem(); + + //handle multiple reducers + Path[] outputFiles = HadoopUtil.getFilteredPath(fs, colDir, + dimColRef.getName() + FactDistinctColumnsReducer.DIMENSION_COL_INFO_FILE_POSTFIX); + if (outputFiles == null || outputFiles.length == 0) { + segment.getDimensionRangeInfoMap().put(dimColRef.getIdentity(), new DimensionRangeInfo(null, null)); + continue; + } - logger.info("updateTimeRange step. minValue:" + minValue + " maxValue:" + maxValue); - if (minValue != timeMinValue && maxValue != timeMaxValue) { - segment.setTSRange(new TSRange(minValue, maxValue + 1)); + FSDataInputStream is = null; + BufferedReader bufferedReader = null; + InputStreamReader isr = null; + Set<String> minValues = Sets.newHashSet(), maxValues = Sets.newHashSet(); + for (Path outputFile : outputFiles) { + try { + is = fs.open(outputFile); + isr = new InputStreamReader(is); + bufferedReader = new BufferedReader(isr); + minValues.add(bufferedReader.readLine()); + maxValues.add(bufferedReader.readLine()); + } finally { + IOUtils.closeQuietly(is); + IOUtils.closeQuietly(isr); + IOUtils.closeQuietly(bufferedReader); + } + } + DataTypeOrder order = dimColRef.getType().getOrder(); + String minValue = order.min(minValues); + String maxValue = order.max(maxValues); + logger.info("updateSegment step. {} minValue:" + minValue + " maxValue:" + maxValue, dimColRef.getName()); + + if (segment.isOffsetCube() && partitionCol != null && partitionCol.getIdentity().equals(dimColRef.getIdentity())) { + logger.info("update partition. {} timeMinValue:" + minValue + " timeMaxValue:" + maxValue, dimColRef.getName()); + if (DateFormat.stringToMillis(minValue) != timeMinValue && DateFormat.stringToMillis(maxValue) != timeMaxValue) { + segment.setTSRange(new TSRange(DateFormat.stringToMillis(minValue), DateFormat.stringToMillis(maxValue) + 1)); + } + } + segment.getDimensionRangeInfoMap().put(dimColRef.getIdentity(), new DimensionRangeInfo(minValue, maxValue)); } } } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java index e7aec8c..56f5077 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java @@ -20,10 +20,12 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; import java.util.List; +import java.util.Map; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.DimensionRangeInfo; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.exception.SegmentNotFoundException; import org.apache.kylin.job.exception.ExecuteException; @@ -66,8 +68,19 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable { boolean isOffsetCube = mergedSegment.isOffsetCube(); Long tsStartMin = Long.MAX_VALUE, tsEndMax = 0L; + Map<String, DimensionRangeInfo> mergedSegDimRangeMap = null; for (String id : mergingSegmentIds) { CubeSegment segment = cube.getSegmentById(id); + Map<String, DimensionRangeInfo> segDimRangeMap = segment.getDimensionRangeInfoMap(); + if (segDimRangeMap.isEmpty()) { + continue; + } + if (mergedSegDimRangeMap == null) { + mergedSegDimRangeMap = segDimRangeMap; + } else { + mergedSegDimRangeMap = DimensionRangeInfo.mergeRangeMap(cube.getModel(), segDimRangeMap, + mergedSegDimRangeMap); + } sourceCount += segment.getInputRecords(); sourceSize += segment.getInputRecordsSize(); tsStartMin = Math.min(tsStartMin, segment.getTSRange().start.v); @@ -80,8 +93,9 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable { mergedSegment.setInputRecordsSize(sourceSize); mergedSegment.setLastBuildJobID(CubingExecutableUtil.getCubingJobId(this.getParams())); mergedSegment.setLastBuildTime(System.currentTimeMillis()); + mergedSegment.setDimensionRangeInfoMap(mergedSegDimRangeMap); - if (isOffsetCube == true) { + if (isOffsetCube) { SegmentRange.TSRange tsRange = new SegmentRange.TSRange(tsStartMin, tsEndMax); mergedSegment.setTSRange(tsRange); } diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java index a6bc019..3c58b26 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java @@ -60,24 +60,22 @@ public class FactDistinctColumnsReducerMappingTest extends LocalFileMetadataTest int totalReducerNum = mapping.getTotalReducerNum(); Assert.assertEquals(2, mapping.getCuboidRowCounterReducerNum()); - // check partition column reducer & cuboid row count reducers + // check cuboid row count reducers Assert.assertEquals(FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER, mapping.getRolePlayOfReducer(totalReducerNum - 1)); Assert.assertEquals(FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER, mapping.getRolePlayOfReducer(totalReducerNum - 2)); - Assert.assertEquals(FactDistinctColumnsReducerMapping.MARK_FOR_PARTITION_COL, - mapping.getRolePlayOfReducer(totalReducerNum - 3)); // check all dict column reducers - int dictEnd = totalReducerNum - 3; + int dictEnd = totalReducerNum - 2; for (int i = 0; i < dictEnd; i++) Assert.assertTrue(mapping.getRolePlayOfReducer(i) >= 0); // check a UHC dict column - Assert.assertEquals(2, mapping.getReducerNumForDictCol(aUHC)); + Assert.assertEquals(2, mapping.getReducerNumForDimCol(aUHC)); int uhcReducerBegin = -1; for (int i = 0; i < dictEnd; i++) { - if (mapping.getDictColForReducer(i).equals(aUHC)) { + if (mapping.getColForReducer(i).equals(aUHC)) { uhcReducerBegin = i; break; } @@ -86,7 +84,7 @@ public class FactDistinctColumnsReducerMappingTest extends LocalFileMetadataTest int[] allRolePlay = mapping.getAllRolePlaysForReducers(); Assert.assertEquals(allRolePlay[uhcReducerBegin], allRolePlay[uhcReducerBegin + 1]); for (int i = 0; i < 5; i++) { - int reducerId = mapping.getReducerIdForDictCol(uhcReducerBegin, i); + int reducerId = mapping.getReducerIdForCol(uhcReducerBegin, i); Assert.assertTrue(uhcReducerBegin <= reducerId && reducerId <= uhcReducerBegin + 1); } } diff --git a/examples/test_case_data/localmeta/cube/ssb_cube_with_dimention_range.json b/examples/test_case_data/localmeta/cube/ssb_cube_with_dimention_range.json new file mode 100644 index 0000000..124081c --- /dev/null +++ b/examples/test_case_data/localmeta/cube/ssb_cube_with_dimention_range.json @@ -0,0 +1,110 @@ +{ + "uuid" : "70a9f288-3c01-4745-a04b-5641e82d6c72", + "name" : "ssb_cube_with_dimention_range", + "owner" : "ADMIN", + "cost" : 50, + "status" : "DISABLED", + "segments" : [{ + "uuid": "f34e2a88-50e4-4c8c-8dd1-64e922ce8c37", + "name": "19920523163722_20180523173026", + "storage_location_identifier": "KYLIN_KFV177RJOS", + "date_range_start": 706639042000, + "date_range_end": 1527096626000, + "source_offset_start": 0, + "source_offset_end": 0, + "status": "READY", + "size_kb": 100, + "input_records": 1000, + "input_records_size": 102400, + "last_build_time": 1527068515334, + "last_build_job_id": "a5010166-b64a-4289-ac56-064640f7f2fa", + "create_time_utc": 1527067828660, + "total_shards": 0, + "blackout_cuboids": [], + "binary_signature": null, + "dimension_range_info_map": { + "PART.P_BRAND": { + "min": "MFGR#1101", + "max": "MFGR#5540" + }, + "V_LINEORDER.LO_QUANTITY": { + "min": "1", + "max": "50" + }, + "PART.P_MFGR": { + "min": "MFGR#1", + "max": "MFGR#5" + }, + "DATES.D_YEARMONTHNUM": { + "min": "199205", + "max": "199808" + }, + "CUSTOMER.C_NATION": { + "min": "ALGERIA", + "max": "VIETNAM" + }, + "DATES.D_YEARMONTH": { + "min": "Apr1993", + "max": "Sep1997" + }, + "CUSTOMER.C_CUSTKEY": { + "min": "1", + "max": "2999" + }, + "DATES.D_DATEKEY": { + "min": "19920523", + "max": "19980802" + }, + "SUPPLIER.S_SUPPKEY": { + "min": "1", + "max": "200" + }, + "SUPPLIER.S_REGION": { + "min": "AFRICA", + "max": "MIDDLE EAST" + }, + "PART.P_PARTKEY": { + "min": "1", + "max": "20000" + }, + "PART.P_CATEGORY": { + "min": "MFGR#11", + "max": "MFGR#55" + }, + "DATES.D_WEEKNUMINYEAR": { + "min": "1", + "max": "53" + }, + "CUSTOMER.C_CITY": { + "min": "ALGERIA 0", + "max": "VIETNAM 9" + }, + "SUPPLIER.S_NATION": { + "min": "ALGERIA", + "max": "VIETNAM" + }, + "SUPPLIER.S_CITY": { + "min": "ALGERIA 1", + "max": "VIETNAM 9" + }, + "CUSTOMER.C_REGION": { + "min": "AFRICA", + "max": "MIDDLE EAST" + }, + "DATES.D_YEAR": { + "min": "1992", + "max": "1998" + }, + "V_LINEORDER.LO_DISCOUNT": { + "min": "0", + "max": "10" + } + } + } ], + "last_modified" : 1457534216410, + "descriptor" : "ssb_cube_with_dimention_range", + "create_time_utc" : 1457444500888, + "size_kb" : 100, + "input_records_count" : 1000, + "input_records_size" : 102400 +} \ No newline at end of file diff --git a/examples/test_case_data/localmeta/cube_desc/ssb_cube_with_dimention_range.json b/examples/test_case_data/localmeta/cube_desc/ssb_cube_with_dimention_range.json new file mode 100644 index 0000000..d91b420 --- /dev/null +++ b/examples/test_case_data/localmeta/cube_desc/ssb_cube_with_dimention_range.json @@ -0,0 +1,269 @@ +{ + "uuid" : "5c44df30-daec-486e-af90-927bf7851060", + "name" : "ssb_cube_with_dimention_range", + "description" : "", + "dimensions" : [ { + "name" : "PART.P_MFGR", + "table" : "PART", + "column" : "P_MFGR", + "derived" : null + }, { + "name" : "DATES.D_YEARMONTH", + "table" : "DATES", + "column" : "D_YEARMONTH", + "derived" : null + }, { + "name" : "SUPPLIER.S_NATION", + "table" : "SUPPLIER", + "column" : "S_NATION", + "derived" : null + }, { + "name" : "V_LINEORDER.LO_DISCOUNT", + "table" : "V_LINEORDER", + "column" : "LO_DISCOUNT", + "derived" : null + }, { + "name" : "CUSTOMER.C_CUSTKEY", + "table" : "CUSTOMER", + "column" : "C_CUSTKEY", + "derived" : null + }, { + "name" : "CUSTOMER.C_NATION", + "table" : "CUSTOMER", + "column" : "C_NATION", + "derived" : null + }, { + "name" : "SUPPLIER.S_REGION", + "table" : "SUPPLIER", + "column" : "S_REGION", + "derived" : null + }, { + "name" : "CUSTOMER.C_CITY", + "table" : "CUSTOMER", + "column" : "C_CITY", + "derived" : null + }, { + "name" : "SUPPLIER.S_SUPPKEY", + "table" : "SUPPLIER", + "column" : "S_SUPPKEY", + "derived" : null + }, { + "name" : "V_LINEORDER.LO_QUANTITY", + "table" : "V_LINEORDER", + "column" : "LO_QUANTITY", + "derived" : null + }, { + "name" : "PART.P_BRAND", + "table" : "PART", + "column" : "P_BRAND", + "derived" : null + }, { + "name" : "SUPPLIER.S_CITY", + "table" : "SUPPLIER", + "column" : "S_CITY", + "derived" : null + }, { + "name" : "PART.P_PARTKEY", + "table" : "PART", + "column" : "P_PARTKEY", + "derived" : null + }, { + "name" : "DATES.D_YEARMONTHNUM", + "table" : "DATES", + "column" : "D_YEARMONTHNUM", + "derived" : null + }, { + "name" : "DATES.D_WEEKNUMINYEAR", + "table" : "DATES", + "column" : "D_WEEKNUMINYEAR", + "derived" : null + }, { + "name" : "CUSTOMER.C_REGION", + "table" : "CUSTOMER", + "column" : "C_REGION", + "derived" : null + }, { + "name" : "PART.P_CATEGORY", + "table" : "PART", + "column" : "P_CATEGORY", + "derived" : null + }, { + "name" : "DATES.D_YEAR", + "table" : "DATES", + "column" : "D_YEAR", + "derived" : null + }, { + "name" : "DATES.D_DATEKEY", + "table" : "DATES", + "column" : "D_DATEKEY", + "derived" : null + } ], + "measures" : [ { + "name" : "_COUNT_", + "function" : { + "expression" : "COUNT", + "parameter" : { + "type" : "constant", + "value" : "1", + "next_parameter" : null + }, + "returntype" : "bigint" + }, + "dependent_measure_ref" : null + }, { + "name" : "TOTAL_REVENUE", + "function" : { + "expression" : "SUM", + "parameter" : { + "type" : "column", + "value" : "LO_REVENUE", + "next_parameter" : null + }, + "returntype" : "bigint" + }, + "dependent_measure_ref" : null + }, { + "name" : "TOTAL_SUPPLYCOST", + "function" : { + "expression" : "SUM", + "parameter" : { + "type" : "column", + "value" : "LO_SUPPLYCOST", + "next_parameter" : null + }, + "returntype" : "bigint" + }, + "dependent_measure_ref" : null + } ], + "rowkey" : { + "rowkey_columns" : [ { + "column" : "PART.P_MFGR", + "encoding" : "dict", + "isShardBy" : false, + "index" : "eq" + }, { + "column" : "DATES.D_YEARMONTHNUM", + "encoding" : "dict", + "isShardBy" : false, + "index" : "eq" + }, { + "column" : "CUSTOMER.C_REGION", + "encoding" : "dict", + "isShardBy" : false, + "index" : "eq" + }, { + "column" : "V_LINEORDER.LO_QUANTITY", + "encoding" : "dict", + "isShardBy" : false, + "index" : "eq" + }, { + "column" : "DATES.D_YEARMONTH", + "encoding" : "dict", + "isShardBy" : false, + "index" : "eq" + }, { + "column" : "DATES.D_WEEKNUMINYEAR", + "encoding" : "dict", + "isShardBy" : false, + "index" : "eq" + }, { + "column" : "SUPPLIER.S_REGION", + "encoding" : "dict", + "isShardBy" : false, + "index" : "eq" + }, { + "column" : "V_LINEORDER.LO_DISCOUNT", + "encoding" : "dict", + "isShardBy" : false, + "index" : "eq" + }, { + "column" : "CUSTOMER.C_CITY", + "encoding" : "dict", + "isShardBy" : false, + "index" : "eq" + }, { + "column" : "PART.P_CATEGORY", + "encoding" : "dict", + "isShardBy" : false, + "index" : "eq" + }, { + "column" : "DATES.D_YEAR", + "encoding" : "dict", + "isShardBy" : false, + "index" : "eq" + }, { + "column" : "PART.P_BRAND", + "encoding" : "dict", + "isShardBy" : false, + "index" : "eq" + }, { + "column" : "SUPPLIER.S_CITY", + "encoding" : "dict", + "isShardBy" : false, + "index" : "eq" + }, { + "column" : "SUPPLIER.S_NATION", + "encoding" : "dict", + "isShardBy" : false, + "index" : "eq" + }, { + "column" : "CUSTOMER.C_NATION", + "encoding" : "dict", + "isShardBy" : false, + "index" : "eq" + }, { + "column" : "DATES.D_DATEKEY", + "encoding" : "dict", + "isShardBy" : false, + "index" : "eq" + }, { + "column" : "SUPPLIER.S_SUPPKEY", + "encoding" : "dict", + "isShardBy" : false, + "index" : "eq" + }, { + "column" : "CUSTOMER.C_CUSTKEY", + "encoding" : "dict", + "isShardBy" : true, + "index" : "eq" + }, { + "column" : "PART.P_PARTKEY", + "encoding" : "dict", + "isShardBy" : false, + "index" : "eq" + } ] + }, + "signature" : "", + "last_modified" : 1457503036686, + "model_name" : "ssb", + "null_string" : null, + "hbase_mapping" : { + "column_family" : [ { + "name" : "F1", + "columns" : [ { + "qualifier" : "M", + "measure_refs" : [ "_COUNT_", "TOTAL_REVENUE", "TOTAL_SUPPLYCOST"] + } ] + } ] + }, + "aggregation_groups" : [ { + "includes" : [ "SUPPLIER.S_REGION", "CUSTOMER.C_NATION", "SUPPLIER.S_NATION", "CUSTOMER.C_REGION", "DATES.D_YEAR", "DATES.D_WEEKNUMINYEAR", "DATES.D_YEARMONTH", "DATES.D_YEARMONTHNUM", "SUPPLIER.S_CITY" ], + "select_rule" : { + "hierarchy_dims" : [ [ "S_REGION", "S_NATION", "S_CITY" ] ], + "mandatory_dims" : [ "DATES.D_YEAR" ], + "joint_dims" : [ ] + } + } ], + "notify_list" : [ ], + "status_need_notify" : [ ], + "partition_date_start" : 3153000000000, + "partition_date_end" : 3153600000000, + "auto_merge_time_ranges" : [ 604800000, 2419200000 ], + "retention_range" : 0, + "engine_type" : 2, + "storage_type" : 2, + "override_kylin_properties" : { + "kylin.storage.hbase.compression-codec" : "lz4", + "kylin.cube.aggrgroup.is-mandatory-only-valid" : "true" + } +} \ No newline at end of file diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java index afd9788..2f22bd4 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java @@ -18,15 +18,31 @@ package org.apache.kylin.provision; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Method; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.TimeZone; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.HBaseMetadataTestCase; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.Pair; @@ -34,6 +50,7 @@ import org.apache.kylin.cube.CubeDescManager; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.DimensionRangeInfo; import org.apache.kylin.cube.cuboid.CuboidScheduler; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.EngineFactory; @@ -48,6 +65,7 @@ import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.impl.threadpool.DefaultScheduler; import org.apache.kylin.metadata.model.SegmentRange.TSRange; +import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.rest.job.StorageCleanupJob; import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator; @@ -55,21 +73,9 @@ import org.apache.kylin.storage.hbase.util.ZookeeperJobLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.lang.reflect.Method; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.TimeZone; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; public class BuildCubeWithEngine { @@ -304,21 +310,29 @@ public class BuildCubeWithEngine { if (!buildSegment(cubeName, date1, date2)) return false; + checkNormalSegRangeInfo(cubeManager.getCube(cubeName)); if (!buildSegment(cubeName, date2, date3)) return false; if (!optimizeCube(cubeName)) return false; + checkNormalSegRangeInfo(cubeManager.getCube(cubeName)); if (!buildSegment(cubeName, date3, date4)) return false; + checkNormalSegRangeInfo(cubeManager.getCube(cubeName)); if (!buildSegment(cubeName, date4, date5)) // one empty segment return false; + checkEmptySegRangeInfo(cubeManager.getCube(cubeName)); if (!buildSegment(cubeName, date5, date6)) // another empty segment return false; + checkEmptySegRangeInfo(cubeManager.getCube(cubeName)); + if (!mergeSegment(cubeName, date2, date4)) // merge 2 normal segments return false; + checkNormalSegRangeInfo(cubeManager.getCube(cubeName)); if (!mergeSegment(cubeName, date2, date5)) // merge normal and empty return false; + checkNormalSegRangeInfo(cubeManager.getCube(cubeName)); // now have 2 normal segments [date1, date2) [date2, date5) and 1 empty segment [date5, date6) return true; @@ -460,4 +474,40 @@ public class BuildCubeWithEngine { } } + private void checkEmptySegRangeInfo(CubeInstance cube) { + CubeSegment segment = getLastModifiedSegment(cube); + for (String colId : segment.getDimensionRangeInfoMap().keySet()) { + DimensionRangeInfo range = segment.getDimensionRangeInfoMap().get(colId); + if (!(range.getMax() == null && range.getMin() == null)) { + throw new RuntimeException("Empty segment must have null info."); + } + } + } + + private void checkNormalSegRangeInfo(CubeInstance cube) { + CubeSegment segment = getLastModifiedSegment(cube); + if (segment.getModel().getPartitionDesc().isPartitioned()) { + TblColRef colRef = segment.getModel().getPartitionDesc().getPartitionDateColumnRef(); + DimensionRangeInfo dmRangeInfo = segment.getDimensionRangeInfoMap().get(colRef.getIdentity()); + long min_v = DateFormat.stringToMillis(dmRangeInfo.getMin()); + long max_v = DateFormat.stringToMillis(dmRangeInfo.getMax()); + long ts_range_start = segment.getTSRange().start.v; + long ts_range_end = segment.getTSRange().end.v; + if (!(ts_range_start <= min_v && max_v <= ts_range_end -1)) { + throw new RuntimeException(String.format( + "Build cube failed, wrong partition column min/max value." + + " Segment: %s, min value: %s, TsRange.start: %s, max value: %s, TsRange.end: %s", + segment, min_v, ts_range_start, max_v, ts_range_end)); + } + } + } + + private CubeSegment getLastModifiedSegment(CubeInstance cube) { + return Collections.max(cube.getSegments(), new Comparator<CubeSegment>() { + @Override + public int compare(CubeSegment o1, CubeSegment o2) { + return Long.compare(o1.getLastBuildTime(), o2.getLastBuildTime()); + } + }); + } }
