KYLIN-2818 Refactor dateRange & sourceOffset on CubeSegment
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/75fbdcff Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/75fbdcff Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/75fbdcff Branch: refs/heads/2622-2764 Commit: 75fbdcffb0366cb555783a229d1a514d010abd87 Parents: a54f016 Author: Li Yang <liy...@apache.org> Authored: Fri Sep 1 15:18:36 2017 +0800 Committer: Hongbin Ma <m...@kyligence.io> Committed: Sat Sep 2 18:47:28 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 4 + .../org/apache/kylin/common/util/ClassUtil.java | 1 - .../main/resources/kylin-defaults.properties | 1 + .../org/apache/kylin/cube/CubeInstance.java | 12 +- .../java/org/apache/kylin/cube/CubeManager.java | 164 ++++++-------- .../java/org/apache/kylin/cube/CubeSegment.java | 136 ++++++------ .../apache/kylin/cube/CubeSegmentAdvisor.java | 95 ++++++++ .../org/apache/kylin/cube/CubeValidator.java | 132 ----------- .../cube/gridtable/SegmentGTStartAndEnd.java | 11 +- .../org/apache/kylin/cube/model/CubeDesc.java | 2 +- .../cube/model/CubeJoinedFlatTableDesc.java | 14 +- .../cube/model/CubeJoinedFlatTableEnrich.java | 18 +- .../org/apache/kylin/cube/CubeManagerTest.java | 49 +++-- .../org/apache/kylin/cube/CubeSegmentsTest.java | 52 ++--- .../org/apache/kylin/job/JoinedFlatTable.java | 8 +- .../metadata/model/ComputedColumnDesc.java | 1 + .../metadata/model/IJoinedFlatTableDesc.java | 5 +- .../apache/kylin/metadata/model/ISegment.java | 21 +- .../kylin/metadata/model/ISegmentAdvisor.java | 35 +++ .../apache/kylin/metadata/model/JoinDesc.java | 7 +- .../kylin/metadata/model/MeasureDesc.java | 1 + .../kylin/metadata/model/PartitionDesc.java | 89 +++++--- .../kylin/metadata/model/SegmentRange.java | 220 +++++++++++++++++++ .../apache/kylin/metadata/model/Segments.java | 166 +++++++++++--- .../apache/kylin/metadata/model/TableRef.java | 1 + .../apache/kylin/metadata/model/TblColRef.java | 2 +- .../apache/kylin/source/SourcePartition.java | 57 ++--- .../DefaultPartitionConditionBuilderTest.java | 20 +- .../kylin/storage/translate/HBaseKeyRange.java | 2 +- .../mr/steps/UpdateCubeInfoAfterBuildStep.java | 7 +- .../engine/mr/steps/MergeCuboidMapperTest.java | 3 +- .../kylin/provision/BuildCubeWithEngine.java | 14 +- .../kylin/provision/BuildCubeWithStream.java | 19 +- .../.settings/org.eclipse.core.resources.prefs | 1 - .../kylin/rest/controller/CubeController.java | 40 ++-- .../rest/controller2/CubeControllerV2.java | 40 ++-- .../apache/kylin/rest/job/HybridCubeCLI.java | 16 +- .../apache/kylin/rest/service/CubeService.java | 13 +- .../apache/kylin/rest/service/JobService.java | 58 ++--- .../apache/kylin/rest/service/ModelService.java | 10 +- .../rest/controller/CubeControllerTest.java | 10 +- .../kylin/rest/service/CacheServiceTest.java | 3 +- .../apache/kylin/source/hive/HiveSource.java | 3 +- .../apache/kylin/source/jdbc/JdbcSource.java | 3 +- .../apache/kylin/source/kafka/KafkaSource.java | 22 +- .../kylin/source/kafka/job/MergeOffsetStep.java | 18 +- .../hbase/util/ExtendCubeToHybridCLI.java | 6 +- .../kylin/tool/ExtendCubeToHybridCLI.java | 6 +- 48 files changed, 965 insertions(+), 653 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index e5fc3d6..9533400 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -335,6 +335,10 @@ abstract public class KylinConfigBase implements Serializable { return getOptional("kylin.cube.cuboid-scheduler", "org.apache.kylin.cube.cuboid.DefaultCuboidScheduler"); } + public String getSegmentAdvisor() { + return getOptional("kylin.cube.segment-advisor", "org.apache.kylin.cube.CubeSegmentAdvisor"); + } + public double getJobCuboidSizeRatio() { return Double.parseDouble(getOptional("kylin.cube.size-estimate-ratio", "0.25")); } http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-common/src/main/java/org/apache/kylin/common/util/ClassUtil.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ClassUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/ClassUtil.java index 4486528..a54a24d 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/ClassUtil.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/ClassUtil.java @@ -66,7 +66,6 @@ public class ClassUtil { classRenameMap.put("org.apache.kylin.rest.util.KeywordDefaultDirtyHack", "org.apache.kylin.query.util.KeywordDefaultDirtyHack"); } - @SuppressWarnings("unchecked") public static <T> Class<? extends T> forName(String name, Class<T> clz) throws ClassNotFoundException { name = forRenamedClass(name); return (Class<? extends T>) Class.forName(name); http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-common/src/main/resources/kylin-defaults.properties ---------------------------------------------------------------------- diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties index 7c421f9..f0328fa 100644 --- a/core-common/src/main/resources/kylin-defaults.properties +++ b/core-common/src/main/resources/kylin-defaults.properties @@ -146,6 +146,7 @@ kylin.engine.mr.uhc-reducer-count=1 ### CUBE | DICTIONARY ### kylin.cube.cuboid-scheduler=org.apache.kylin.cube.cuboid.DefaultCuboidScheduler +kylin.cube.segment-advisor=org.apache.kylin.cube.CubeSegmentAdvisor # 'auto', 'inmem', 'layer' or 'random' for testing kylin.cube.algorithm=layer http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java index 4c57db8..fad942c 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java @@ -26,13 +26,13 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinConfigExt; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.RootPersistentEntity; -import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.IBuildable; import org.apache.kylin.metadata.model.JoinTableDesc; import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.SegmentRange; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.Segments; import org.apache.kylin.metadata.model.TblColRef; @@ -99,11 +99,11 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, public CubeInstance() { } - public List<CubeSegment> getBuildingSegments() { + public Segments<CubeSegment> getBuildingSegments() { return segments.getBuildingSegments(); } - public List<CubeSegment> getMergingSegments(CubeSegment mergedSegment) { + public Segments<CubeSegment> getMergingSegments(CubeSegment mergedSegment) { return segments.getMergingSegments(mergedSegment); } @@ -337,12 +337,12 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, @Override public long getDateRangeStart() { - return segments.getDateRangeStart(); + return segments.getTSStart(); } @Override public long getDateRangeEnd() { - return segments.getDateRangeEnd(); + return segments.getTSEnd(); } @Override @@ -366,7 +366,7 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, return this.getDescriptor().getAutoMergeTimeRanges() != null && this.getDescriptor().getAutoMergeTimeRanges().length > 0; } - public Pair<Long, Long> autoMergeCubeSegments() throws IOException { + public SegmentRange autoMergeCubeSegments() throws IOException { return segments.autoMergeCubeSegments(needAutoMerge(), getName(), getDescriptor().getAutoMergeTimeRanges()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index e089198..043993c 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -55,6 +55,8 @@ import org.apache.kylin.metadata.cachesync.Broadcaster; import org.apache.kylin.metadata.cachesync.Broadcaster.Event; import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache; import org.apache.kylin.metadata.model.JoinDesc; +import org.apache.kylin.metadata.model.SegmentRange; +import org.apache.kylin.metadata.model.SegmentRange.TSRange; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.Segments; import org.apache.kylin.metadata.model.TableDesc; @@ -404,7 +406,7 @@ public class CubeManager implements IRealizationProvider { } Collections.sort(newSegs); - CubeValidator.validate(newSegs); + newSegs.validate(); cube.setSegments(newSegs); if (update.getStatus() != null) { @@ -454,39 +456,38 @@ public class CubeManager implements IRealizationProvider { // append a full build segment public CubeSegment appendSegment(CubeInstance cube) throws IOException { - return appendSegment(cube, 0, Long.MAX_VALUE, 0, 0, null, null); + return appendSegment(cube, null, null, null, null); } - public CubeSegment appendSegment(CubeInstance cube, long startDate, long endDate) throws IOException { - return appendSegment(cube, startDate, endDate, 0, 0, null, null); + public CubeSegment appendSegment(CubeInstance cube, TSRange tsRange) throws IOException { + return appendSegment(cube, tsRange, null, null, null); } - public CubeSegment appendSegment(CubeInstance cube, SourcePartition sourcePartition) throws IOException { - return appendSegment(cube, sourcePartition.getStartDate(), sourcePartition.getEndDate(), - sourcePartition.getStartOffset(), sourcePartition.getEndOffset(), - sourcePartition.getSourcePartitionOffsetStart(), sourcePartition.getSourcePartitionOffsetEnd()); + public CubeSegment appendSegment(CubeInstance cube, SourcePartition src) throws IOException { + return appendSegment(cube, src.getTSRange(), src.getSegRange(), src.getSourcePartitionOffsetStart(), + src.getSourcePartitionOffsetEnd()); } - CubeSegment appendSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, + CubeSegment appendSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange, Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd) throws IOException { + checkInputRanges(tsRange, segRange); checkBuildingSegment(cube); // fix start/end a bit if (cube.getModel().getPartitionDesc().isPartitioned()) { // if missing start, set it to where last time ends - if (startDate == 0 && startOffset == 0 && cube.getLastSegment() != null) { - CubeSegment last = cube.getLastSegment(); - startDate = last.isSourceOffsetsOn() ? 0 : last.getDateRangeEnd(); - startOffset = last.isSourceOffsetsOn() ? last.getSourceOffsetEnd() : 0; + CubeSegment last = cube.getLastSegment(); + if (last != null && !last.isOffsetCube() && tsRange.start.v == 0) { + tsRange = new TSRange(last.getTSRange().end.v, tsRange.end.v); } } else { // full build - startDate = startOffset = endOffset = 0; - endDate = Long.MAX_VALUE; + tsRange = null; + segRange = null; } - CubeSegment newSegment = newSegment(cube, startDate, endDate, startOffset, endOffset); + CubeSegment newSegment = newSegment(cube, tsRange, segRange); newSegment.setSourcePartitionOffsetStart(sourcePartitionOffsetStart); newSegment.setSourcePartitionOffsetEnd(sourcePartitionOffsetEnd); validateNewSegments(cube, newSegment); @@ -497,22 +498,22 @@ public class CubeManager implements IRealizationProvider { return newSegment; } - public CubeSegment refreshSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset) + public CubeSegment refreshSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange) throws IOException { + checkInputRanges(tsRange, segRange); checkBuildingSegment(cube); - CubeSegment newSegment = newSegment(cube, startDate, endDate, startOffset, endOffset); + CubeSegment newSegment = newSegment(cube, tsRange, segRange); - Pair<Boolean, Boolean> pair = CubeValidator.fitInSegments(cube.getSegments(), newSegment); + Pair<Boolean, Boolean> pair = cube.getSegments().fitInSegments(newSegment); if (pair.getFirst() == false || pair.getSecond() == false) throw new IllegalArgumentException("The new refreshing segment " + newSegment + " does not match any existing segment in cube " + cube); - if (startOffset > 0 || endOffset > 0) { + if (segRange != null) { CubeSegment toRefreshSeg = null; for (CubeSegment cubeSegment : cube.getSegments()) { - if (cubeSegment.getSourceOffsetStart() == startOffset - && cubeSegment.getSourceOffsetEnd() == endOffset) { + if (cubeSegment.getSegRange().equals(segRange)) { toRefreshSeg = cubeSegment; break; } @@ -533,61 +534,48 @@ public class CubeManager implements IRealizationProvider { return newSegment; } - public CubeSegment mergeSegments(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, - boolean force) throws IOException { + public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, SegmentRange segRange, boolean force) + throws IOException { if (cube.getSegments().isEmpty()) throw new IllegalArgumentException("Cube " + cube + " has no segments"); - if (startDate >= endDate && startOffset >= endOffset) - throw new IllegalArgumentException("Invalid merge range"); + checkInputRanges(tsRange, segRange); checkBuildingSegment(cube); checkCubeIsPartitioned(cube); - boolean isOffsetsOn = cube.getSegments().get(0).isSourceOffsetsOn(); - - if (isOffsetsOn) { + if (cube.getSegments().getFirstSegment().isOffsetCube()) { // offset cube, merge by date range? - if (startOffset == endOffset) { + if (segRange == null && tsRange != null) { Pair<CubeSegment, CubeSegment> pair = cube.getSegments(SegmentStatusEnum.READY) - .findMergeOffsetsByDateRange(startDate, endDate, Long.MAX_VALUE); + .findMergeOffsetsByDateRange(tsRange, Long.MAX_VALUE); if (pair == null) - throw new IllegalArgumentException("Find no segments to merge by date range " + startDate + "-" - + endDate + " for cube " + cube); - startOffset = pair.getFirst().getSourceOffsetStart(); - endOffset = pair.getSecond().getSourceOffsetEnd(); + throw new IllegalArgumentException("Find no segments to merge by " + tsRange + " for cube " + cube); + segRange = new SegmentRange(pair.getFirst().getSegRange().start, pair.getSecond().getSegRange().end); } - startDate = 0; - endDate = 0; + tsRange = null; + Preconditions.checkArgument(segRange != null); } else { - // date range cube, make sure range is on dates - if (startDate == endDate) { - startDate = startOffset; - endDate = endOffset; - } - startOffset = 0; - endOffset = 0; + segRange = null; + Preconditions.checkArgument(tsRange != null); } - CubeSegment newSegment = newSegment(cube, startDate, endDate, startOffset, endOffset); + CubeSegment newSegment = newSegment(cube, tsRange, segRange); - List<CubeSegment> mergingSegments = cube.getMergingSegments(newSegment); + Segments<CubeSegment> mergingSegments = cube.getMergingSegments(newSegment); if (mergingSegments.size() <= 1) - throw new IllegalArgumentException( - "Range " + newSegment.getSourceOffsetStart() + "-" + newSegment.getSourceOffsetEnd() - + " must contain at least 2 segments, but there is " + mergingSegments.size()); + throw new IllegalArgumentException("Range " + newSegment.getSegRange() + + " must contain at least 2 segments, but there is " + mergingSegments.size()); CubeSegment first = mergingSegments.get(0); CubeSegment last = mergingSegments.get(mergingSegments.size() - 1); - if (newSegment.isSourceOffsetsOn()) { - newSegment.setDateRangeStart(minDateRangeStart(mergingSegments)); - newSegment.setDateRangeEnd(maxDateRangeEnd(mergingSegments)); - newSegment.setSourceOffsetStart(first.getSourceOffsetStart()); - newSegment.setSourceOffsetEnd(last.getSourceOffsetEnd()); + if (first.isOffsetCube()) { + newSegment.setSegRange(new SegmentRange(first.getSegRange().start, last.getSegRange().end)); newSegment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart()); newSegment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd()); + newSegment.setTSRange(null); } else { - newSegment.setDateRangeStart(first.getSourceOffsetStart()); - newSegment.setDateRangeEnd(last.getSourceOffsetEnd()); + newSegment.setTSRange(new TSRange(mergingSegments.getTSStart(), mergingSegments.getTSEnd())); + newSegment.setSegRange(null); } if (force == false) { @@ -613,27 +601,10 @@ public class CubeManager implements IRealizationProvider { return newSegment; } - - public static long minDateRangeStart(List<CubeSegment> mergingSegments) { - long min = Long.MAX_VALUE; - for (CubeSegment seg : mergingSegments) - min = Math.min(min, seg.getDateRangeStart()); - return min; - } - - public static long maxDateRangeEnd(List<CubeSegment> mergingSegments) { - long max = Long.MIN_VALUE; - for (CubeSegment seg : mergingSegments) - max = Math.max(max, seg.getDateRangeEnd()); - return max; - } - - public CubeSegment getLatestSegment(CubeInstance cube) { - List<CubeSegment> existing = cube.getSegments(); - if (existing.isEmpty()) { - return null; - } else { - return existing.get(existing.size() - 1); + + private void checkInputRanges(TSRange tsRange, SegmentRange segRange) { + if (tsRange != null && segRange != null) { + throw new IllegalArgumentException("Build or refresh cube segment either by TSRange or by SegmentRange, not both."); } } @@ -690,15 +661,19 @@ public class CubeManager implements IRealizationProvider { } } - private CubeSegment newSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset) { + private CubeSegment newSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange) { CubeSegment segment = new CubeSegment(); segment.setUuid(UUID.randomUUID().toString()); - segment.setName(CubeSegment.makeSegmentName(startDate, endDate, startOffset, endOffset)); + segment.setName(CubeSegment.makeSegmentName(tsRange, segRange)); segment.setCreateTimeUTC(System.currentTimeMillis()); - segment.setDateRangeStart(startDate); - segment.setDateRangeEnd(endDate); - segment.setSourceOffsetStart(startOffset); - segment.setSourceOffsetEnd(endOffset); + segment.setCubeInstance(cube); + + // let full build range be backward compatible + if (tsRange == null && segRange == null) + tsRange = new TSRange(0L, Long.MAX_VALUE); + + segment.setTSRange(tsRange); + segment.setSegRange(segRange); segment.setStatus(SegmentStatusEnum.NEW); segment.setStorageLocationIdentifier(generateStorageLocation()); @@ -724,10 +699,6 @@ public class CubeManager implements IRealizationProvider { return tableName; } - public Pair<Long, Long> autoMergeCubeSegments(CubeInstance cube) throws IOException { - return cube.autoMergeCubeSegments(); - } - public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment newSegment) throws IOException { if (StringUtils.isBlank(newSegment.getStorageLocationIdentifier())) throw new IllegalStateException( @@ -923,25 +894,24 @@ public class CubeManager implements IRealizationProvider { } Collections.sort(segments); - boolean isOffsetOn = segments.get(0).isSourceOffsetsOn(); for (int i = 0; i < segments.size() - 1; ++i) { CubeSegment first = segments.get(i); CubeSegment second = segments.get(i + 1); - if (first.getSourceOffsetEnd() == second.getSourceOffsetStart()) { + if (first.getSegRange().connects(second.getSegRange())) continue; - } else if (first.getSourceOffsetEnd() < second.getSourceOffsetStart()) { + + if (first.getSegRange().apartBefore(second.getSegRange())) { CubeSegment hole = new CubeSegment(); - if (isOffsetOn == true) { - hole.setSourceOffsetStart(first.getSourceOffsetEnd()); + hole.setCubeInstance(cube); + if (first.isOffsetCube()) { + hole.setSegRange(new SegmentRange(first.getSegRange().end, second.getSegRange().start)); hole.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetEnd()); - hole.setSourceOffsetEnd(second.getSourceOffsetStart()); hole.setSourcePartitionOffsetEnd(second.getSourcePartitionOffsetStart()); + hole.setName(CubeSegment.makeSegmentName(null, hole.getSegRange())); } else { - hole.setDateRangeStart(first.getDateRangeEnd()); - hole.setDateRangeEnd(second.getDateRangeStart()); + hole.setTSRange(new TSRange(first.getTSRange().end.v, second.getTSRange().start.v)); + hole.setName(CubeSegment.makeSegmentName(hole.getTSRange(), null)); } - hole.setName(CubeSegment.makeSegmentName(hole.getDateRangeStart(), hole.getDateRangeEnd(), - hole.getSourceOffsetStart(), hole.getSourceOffsetEnd())); holes.add(hole); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java index bb55c1a..358183e 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java @@ -18,6 +18,7 @@ package org.apache.kylin.cube; +import java.io.Serializable; import java.text.SimpleDateFormat; import java.util.Collection; import java.util.LinkedHashMap; @@ -37,6 +38,9 @@ import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.IBuildable; import org.apache.kylin.metadata.model.ISegment; +import org.apache.kylin.metadata.model.ISegmentAdvisor; +import org.apache.kylin.metadata.model.SegmentRange; +import org.apache.kylin.metadata.model.SegmentRange.TSRange; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.Segments; import org.apache.kylin.metadata.model.TblColRef; @@ -50,8 +54,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +@SuppressWarnings("serial") @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) -public class CubeSegment implements Comparable<CubeSegment>, IBuildable, ISegment, java.io.Serializable { +public class CubeSegment implements IBuildable, ISegment, Serializable { @JsonBackReference private CubeInstance cubeInstance; @@ -114,32 +119,27 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable, ISegmen private Map<String, String> additionalInfo = new LinkedHashMap<String, String>(); private Map<Long, Short> cuboidBaseShards = Maps.newConcurrentMap(); // cuboid id ==> base(starting) shard for this cuboid + + // lazy init + transient ISegmentAdvisor advisor = null; public CubeDesc getCubeDesc() { return getCubeInstance().getDescriptor(); } - /** - * @param startDate - * @param endDate - * @return if(startDate == 0 && endDate == 0), returns "FULL_BUILD", else - * returns "yyyyMMddHHmmss_yyyyMMddHHmmss" - */ - public static String makeSegmentName(long startDate, long endDate, long startOffset, long endOffset) { - if (startOffset == 0 && startDate == 0 // - && (endOffset == 0 || endOffset == Long.MAX_VALUE) // - && (endDate == 0 || endDate == Long.MAX_VALUE)) { + public static String makeSegmentName(TSRange tsRange, SegmentRange segRange) { + if (tsRange == null && segRange == null) { return "FULL_BUILD"; } - if (startOffset != 0 || endOffset != 0) { - return startOffset + "_" + endOffset; + if (segRange != null) { + return segRange.start.v + "_" + segRange.end.v; } // using time SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss"); dateFormat.setTimeZone(TimeZone.getTimeZone("GMT")); - return dateFormat.format(startDate) + "_" + dateFormat.format(endDate); + return dateFormat.format(tsRange.start.v) + "_" + dateFormat.format(tsRange.end.v); } // ============================================================================ @@ -164,22 +164,6 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable, ISegmen this.name = name; } - public long getDateRangeStart() { - return dateRangeStart; - } - - public void setDateRangeStart(long dateRangeStart) { - this.dateRangeStart = dateRangeStart; - } - - public long getDateRangeEnd() { - return dateRangeEnd; - } - - public void setDateRangeEnd(long dateRangeEnd) { - this.dateRangeEnd = dateRangeEnd; - } - public SegmentStatusEnum getStatus() { return status; } @@ -254,7 +238,7 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable, ISegmen return cubeInstance; } - public void setCubeInstance(CubeInstance cubeInstance) { + void setCubeInstance(CubeInstance cubeInstance) { this.cubeInstance = cubeInstance; } @@ -335,63 +319,89 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable, ISegmen return new CubeDimEncMap(this); } - public boolean isSourceOffsetsOn() { - return sourceOffsetStart != 0 || sourceOffsetEnd != 0; + // Hide the 4 confusing fields: dateRangeStart, dateRangeEnd, sourceOffsetStart, sourceOffsetEnd. + // They are now managed via SegmentRange and TSRange. + long _getDateRangeStart() { + return dateRangeStart; } - // date range is used in place of source offsets when offsets are missing - public long getSourceOffsetStart() { - return isSourceOffsetsOn() ? sourceOffsetStart : dateRangeStart; + void _setDateRangeStart(long dateRangeStart) { + this.dateRangeStart = dateRangeStart; } - public void setSourceOffsetStart(long sourceOffsetStart) { - this.sourceOffsetStart = sourceOffsetStart; + long _getDateRangeEnd() { + return dateRangeEnd; } - // date range is used in place of source offsets when offsets are missing - public long getSourceOffsetEnd() { - return isSourceOffsetsOn() ? sourceOffsetEnd : dateRangeEnd; + void _setDateRangeEnd(long dateRangeEnd) { + this.dateRangeEnd = dateRangeEnd; } - public void setSourceOffsetEnd(long sourceOffsetEnd) { - this.sourceOffsetEnd = sourceOffsetEnd; + long _getSourceOffsetStart() { + return sourceOffsetStart; } - // date range is used in place of source offsets when offsets are missing - public boolean sourceOffsetOverlaps(CubeSegment seg) { - return Segments.sourceOffsetOverlaps(this, seg); + void _setSourceOffsetStart(long sourceOffsetStart) { + this.sourceOffsetStart = sourceOffsetStart; } - // date range is used in place of source offsets when offsets are missing - public boolean sourceOffsetContains(ISegment seg) { - return Segments.sourceOffsetContains(this, seg); + long _getSourceOffsetEnd() { + return sourceOffsetEnd; } + void _setSourceOffsetEnd(long sourceOffsetEnd) { + this.sourceOffsetEnd = sourceOffsetEnd; + } + @Override - public void validate() throws IllegalStateException { - if (cubeInstance.getDescriptor().getModel().getPartitionDesc().isPartitioned()) { - if (!isSourceOffsetsOn() && dateRangeStart >= dateRangeEnd) - throw new IllegalStateException("Invalid segment, dateRangeStart(" + dateRangeStart + ") must be smaller than dateRangeEnd(" + dateRangeEnd + ") in segment " + this); - if (isSourceOffsetsOn() && sourceOffsetStart >= sourceOffsetEnd) - throw new IllegalStateException("Invalid segment, sourceOffsetStart(" + sourceOffsetStart + ") must be smaller than sourceOffsetEnd(" + sourceOffsetEnd + ") in segment " + this); + public SegmentRange getSegRange() { + return getAdvisor().getSegRange(); + } + + public void setSegRange(SegmentRange range) { + getAdvisor().setSegRange(range); + } + + @Override + public TSRange getTSRange() { + return getAdvisor().getTSRange(); + } + + public void setTSRange(TSRange range) { + getAdvisor().setTSRange(range); + } + + public boolean isOffsetCube() { + return getAdvisor().isOffsetCube(); + } + + private ISegmentAdvisor getAdvisor() { + if (advisor != null) + return advisor; + + synchronized (this) { + if (advisor == null) { + advisor = Segments.newSegmentAdvisor(this); + } + return advisor; } } + @Override + public void validate() throws IllegalStateException { + } + public String getProject() { return getCubeDesc().getProject(); } @Override - public int compareTo(CubeSegment other) { - long comp = this.getSourceOffsetStart() - other.getSourceOffsetStart(); + public int compareTo(ISegment other) { + int comp = this.getSegRange().start.compareTo(other.getSegRange().start); if (comp != 0) - return comp < 0 ? -1 : 1; + return comp; - comp = this.getSourceOffsetEnd() - other.getSourceOffsetEnd(); - if (comp != 0) - return comp < 0 ? -1 : 1; - else - return 0; + return this.getSegRange().end.compareTo(other.getSegRange().end); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-cube/src/main/java/org/apache/kylin/cube/CubeSegmentAdvisor.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegmentAdvisor.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegmentAdvisor.java new file mode 100644 index 0000000..584cf4e --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegmentAdvisor.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.cube; + +import org.apache.kylin.metadata.model.ISegment; +import org.apache.kylin.metadata.model.ISegmentAdvisor; +import org.apache.kylin.metadata.model.SegmentRange; +import org.apache.kylin.metadata.model.SegmentRange.TSRange; + +public class CubeSegmentAdvisor implements ISegmentAdvisor { + + private final CubeSegment seg; + + // these are just cache of segment attributes, all changes must write through to 'seg' + private TSRange tsRange; + private SegmentRange segRange; + + public CubeSegmentAdvisor(ISegment segment) { + this.seg = (CubeSegment) segment; + } + + @Override + public boolean isOffsetCube() { + return seg._getSourceOffsetStart() != 0 || seg._getSourceOffsetEnd() != 0; + } + + @Override + public SegmentRange getSegRange() { + if (segRange != null) + return segRange; + + // backward compatible with pre-streaming metadata, TSRange can imply SegmentRange + segRange = isOffsetCube() // + ? new SegmentRange(seg._getSourceOffsetStart(), seg._getSourceOffsetEnd()) // + : getTSRange(); + + return segRange; + } + + @Override + public void setSegRange(SegmentRange range) { + // backward compatible with pre-streaming metadata, TSRange can imply SegmentRange + if (range == null) { + seg._setSourceOffsetStart(0); + seg._setSourceOffsetEnd(0); + } else { + seg._setSourceOffsetStart((Long) range.start.v); + seg._setSourceOffsetEnd((Long) range.end.v); + } + clear(); + } + + @Override + public TSRange getTSRange() { + if (tsRange != null) + return tsRange; + + tsRange = new TSRange(seg._getDateRangeStart(), seg._getDateRangeEnd()); + return tsRange; + } + + @Override + public void setTSRange(TSRange range) { + if (range == null) { + seg._setDateRangeStart(0); + seg._setDateRangeEnd(0); + } else { + seg._setDateRangeStart(range.start.v); + seg._setDateRangeEnd(range.end.v); + } + clear(); + } + + private void clear() { + tsRange = null; + segRange = null; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-cube/src/main/java/org/apache/kylin/cube/CubeValidator.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeValidator.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeValidator.java deleted file mode 100644 index f94752f..0000000 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeValidator.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.cube; - -import java.util.Collection; -import java.util.Collections; -import java.util.List; - -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; - -public class CubeValidator { - private static final Logger logger = LoggerFactory.getLogger(CubeValidator.class); - - /** - * Validates: - * - consistent isOffsetsOn() - * - for all ready segments, sourceOffset MUST have no overlaps, SHOULD have no holes - * - for all new segments, sourceOffset MUST have no overlaps, MUST contain a ready segment if overlaps with it - * - for all new segments, sourceOffset SHOULD fit/connect another segments - * - dateRange does not matter any more - */ - public static void validate(Collection<CubeSegment> segments) { - if (segments == null || segments.isEmpty()) - return; - - // make a copy, don't modify given list - List<CubeSegment> all = Lists.newArrayList(segments); - Collections.sort(all); - - // check consistent isOffsetsOn() - boolean isOffsetsOn = all.get(0).isSourceOffsetsOn(); - for (CubeSegment seg : all) { - seg.validate(); - if (seg.isSourceOffsetsOn() != isOffsetsOn) - throw new IllegalStateException("Inconsistent isOffsetsOn for segment " + seg); - } - - List<CubeSegment> ready = Lists.newArrayListWithCapacity(all.size()); - List<CubeSegment> news = Lists.newArrayListWithCapacity(all.size()); - for (CubeSegment seg : all) { - if (seg.getStatus() == SegmentStatusEnum.READY) - ready.add(seg); - else - news.add(seg); - } - - // for all ready segments, sourceOffset MUST have no overlaps, SHOULD have no holes - CubeSegment pre = null; - for (CubeSegment seg : ready) { - if (pre != null) { - if (pre.sourceOffsetOverlaps(seg)) - throw new IllegalStateException("Segments overlap: " + pre + " and " + seg); - if (pre.getSourceOffsetEnd() < seg.getSourceOffsetStart()) - logger.warn("Hole between adjacent READY segments " + pre + " and " + seg); - } - pre = seg; - } - - // for all other segments, sourceOffset MUST have no overlaps, MUST contain a ready segment if overlaps with it - pre = null; - for (CubeSegment seg : news) { - if (pre != null) { - if (pre.sourceOffsetOverlaps(seg)) - throw new IllegalStateException("Segments overlap: " + pre + " and " + seg); - } - pre = seg; - - for (CubeSegment aReady : ready) { - if (seg.sourceOffsetOverlaps(aReady) && !seg.sourceOffsetContains(aReady)) - throw new IllegalStateException("Segments overlap: " + aReady + " and " + seg); - } - } - - // for all other segments, sourceOffset SHOULD fit/connect other segments - for (CubeSegment seg : news) { - Pair<Boolean, Boolean> pair = fitInSegments(all, seg); - boolean startFit = pair.getFirst(); - boolean endFit = pair.getSecond(); - - if (!startFit) - logger.warn("NEW segment start does not fit/connect with other segments: " + seg); - if (!endFit) - logger.warn("NEW segment end does not fit/connect with other segments: " + seg); - } - } - - public static Pair<Boolean, Boolean> fitInSegments(List<CubeSegment> segments, CubeSegment newOne) { - if (segments == null || segments.isEmpty()) - return null; - - CubeSegment first = segments.get(0); - CubeSegment last = segments.get(segments.size() - 1); - long start = newOne.getSourceOffsetStart(); - long end = newOne.getSourceOffsetEnd(); - boolean startFit = false; - boolean endFit = false; - for (CubeSegment sss : segments) { - if (sss == newOne) - continue; - startFit = startFit || (start == sss.getSourceOffsetStart() || start == sss.getSourceOffsetEnd()); - endFit = endFit || (end == sss.getSourceOffsetStart() || end == sss.getSourceOffsetEnd()); - } - if (!startFit && endFit && newOne == first) - startFit = true; - if (!endFit && startFit && newOne == last) - endFit = true; - - return Pair.newPair(startFit, endFit); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java index 8ceb841..7ecbbd6 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java @@ -28,6 +28,7 @@ import org.apache.kylin.dimension.AbstractDateDimEnc; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.ISegment; +import org.apache.kylin.metadata.model.SegmentRange.TSRange; public class SegmentGTStartAndEnd { private ISegment segment; @@ -43,16 +44,18 @@ public class SegmentGTStartAndEnd { } public Pair<ByteArray, ByteArray> getSegmentStartAndEnd(int index) { + TSRange tsRange = segment.getTSRange(); + ByteArray start; - if (segment.getDateRangeStart() != Long.MIN_VALUE) { - start = encodeTime(segment.getDateRangeStart(), index, 1); + if (!tsRange.start.isMin) { + start = encodeTime(tsRange.start.v, index, 1); } else { start = new ByteArray(); } ByteArray end; - if (segment.getDateRangeEnd() != Long.MAX_VALUE) { - end = encodeTime(segment.getDateRangeEnd(), index, -1); + if (!tsRange.end.isMax) { + end = encodeTime(tsRange.end.v, index, -1); } else { end = new ByteArray(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index 0e22587..be523ed 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -189,7 +189,7 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { @JsonInclude(JsonInclude.Include.NON_NULL) private int parentForward = 3; - // cuboid scheduler lazy built + // lazy init transient private CuboidScheduler cuboidScheduler = null; public boolean isEnableSharding() { http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java index c49d37a..f8b039a 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java @@ -18,6 +18,7 @@ package org.apache.kylin.cube.model; +import java.io.Serializable; import java.util.List; import java.util.Map; @@ -28,6 +29,7 @@ import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.SegmentRange; import org.apache.kylin.metadata.model.TblColRef; import com.google.common.base.Preconditions; @@ -36,7 +38,8 @@ import com.google.common.collect.Maps; /** */ -public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, java.io.Serializable { +@SuppressWarnings("serial") +public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializable { protected final String tableName; protected final CubeDesc cubeDesc; @@ -146,13 +149,8 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, java.io.Se } @Override - public long getSourceOffsetStart() { - return cubeSegment.getSourceOffsetStart(); - } - - @Override - public long getSourceOffsetEnd() { - return cubeSegment.getSourceOffsetEnd(); + public SegmentRange getSegRange() { + return cubeSegment.getSegRange(); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java index e829aeb..223df7c 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java @@ -18,20 +18,23 @@ package org.apache.kylin.cube.model; +import java.io.Serializable; +import java.util.List; + import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.SegmentRange; import org.apache.kylin.metadata.model.TblColRef; -import java.util.List; - /** * An enrich of IJoinedFlatTableDesc for cubes */ -public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc, java.io.Serializable { +@SuppressWarnings("serial") +public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc, Serializable { private CubeDesc cubeDesc; private IJoinedFlatTableDesc flatDesc; @@ -113,13 +116,8 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc, java.io. } @Override - public long getSourceOffsetStart() { - return flatDesc.getSourceOffsetStart(); - } - - @Override - public long getSourceOffsetEnd() { - return flatDesc.getSourceOffsetEnd(); + public SegmentRange getSegRange() { + return flatDesc.getSegRange(); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java index 3cae37d..7dd7212 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java @@ -30,8 +30,9 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.LocalFileMetadataTestCase; -import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.metadata.model.SegmentRange; +import org.apache.kylin.metadata.model.SegmentRange.TSRange; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.project.ProjectManager; @@ -108,10 +109,10 @@ public class CubeManagerTest extends LocalFileMetadataTestCase { assertEquals(0, cube.getSegments().size()); // append first - CubeSegment seg1 = mgr.appendSegment(cube, 0, 1000, 0, 0, null, null); + CubeSegment seg1 = mgr.appendSegment(cube, new TSRange(0L, 1000L), null, null, null); seg1.setStatus(SegmentStatusEnum.READY); - CubeSegment seg2 = mgr.appendSegment(cube, 1000, 2000, 0, 0, null, null); + CubeSegment seg2 = mgr.appendSegment(cube, new TSRange(1000L, 2000L), null, null, null); seg2.setStatus(SegmentStatusEnum.READY); CubeUpdate cubeBuilder = new CubeUpdate(cube); @@ -120,7 +121,7 @@ public class CubeManagerTest extends LocalFileMetadataTestCase { assertEquals(2, cube.getSegments().size()); - Pair<Long, Long> mergedSeg = mgr.autoMergeCubeSegments(cube); + SegmentRange mergedSeg = cube.autoMergeCubeSegments(); assertTrue(mergedSeg != null); @@ -145,25 +146,25 @@ public class CubeManagerTest extends LocalFileMetadataTestCase { m4.put(1, 4000L); // append first - CubeSegment seg1 = mgr.appendSegment(cube, 0, 0, 0, 1000, null, m1); + CubeSegment seg1 = mgr.appendSegment(cube, null, new SegmentRange(0L, 1000L), null, m1); seg1.setStatus(SegmentStatusEnum.READY); - CubeSegment seg2 = mgr.appendSegment(cube, 0, 0, 1000, 2000, m1, m2); + CubeSegment seg2 = mgr.appendSegment(cube, null, new SegmentRange(1000L, 2000L), m1, m2); seg2.setStatus(SegmentStatusEnum.READY); - CubeSegment seg3 = mgr.mergeSegments(cube, 0, 0, 0000, 2000, true); + CubeSegment seg3 = mgr.mergeSegments(cube, null, new SegmentRange(0L, 2000L), true); seg3.setStatus(SegmentStatusEnum.NEW); - CubeSegment seg4 = mgr.appendSegment(cube, 0, 0, 2000, 3000, m2, m3); + CubeSegment seg4 = mgr.appendSegment(cube, null, new SegmentRange(2000L, 3000L), m2, m3); seg4.setStatus(SegmentStatusEnum.NEW); seg4.setLastBuildJobID("test"); seg4.setStorageLocationIdentifier("test"); - CubeSegment seg5 = mgr.appendSegment(cube, 0, 0, 3000, 4000, m3, m4); + CubeSegment seg5 = mgr.appendSegment(cube, null, new SegmentRange(3000L, 4000L), m3, m4); seg5.setStatus(SegmentStatusEnum.READY); CubeUpdate cubeBuilder = new CubeUpdate(cube); @@ -202,26 +203,26 @@ public class CubeManagerTest extends LocalFileMetadataTestCase { m4.put(1, 4000L); // append first - CubeSegment seg1 = mgr.appendSegment(cube, 0, 0, 0, 1000, null, m1); + CubeSegment seg1 = mgr.appendSegment(cube, null, new SegmentRange(0L, 1000L), null, m1); seg1.setStatus(SegmentStatusEnum.READY); - CubeSegment seg2 = mgr.appendSegment(cube, 0, 0, 1000, 2000, m1, m2); + CubeSegment seg2 = mgr.appendSegment(cube, null, new SegmentRange(1000L, 2000L), m1, m2); seg2.setStatus(SegmentStatusEnum.READY); - CubeSegment seg3 = mgr.appendSegment(cube, 0, 0, 2000, 3000, m2, m3); + CubeSegment seg3 = mgr.appendSegment(cube, null, new SegmentRange(2000L, 3000L), m2, m3); seg3.setStatus(SegmentStatusEnum.READY); - CubeSegment seg4 = mgr.appendSegment(cube, 0, 0, 3000, 4000, m3, m4); + CubeSegment seg4 = mgr.appendSegment(cube, null, new SegmentRange(3000L, 4000L), m3, m4); seg4.setStatus(SegmentStatusEnum.READY); - CubeSegment merge1 = mgr.mergeSegments(cube, 0, 0, 0, 2000, true); + CubeSegment merge1 = mgr.mergeSegments(cube, null, new SegmentRange(0L, 2000L), true); merge1.setStatus(SegmentStatusEnum.NEW); merge1.setLastBuildJobID("test"); merge1.setStorageLocationIdentifier("test"); - CubeSegment merge2 = mgr.mergeSegments(cube, 0, 0, 2000, 4000, true); + CubeSegment merge2 = mgr.mergeSegments(cube, null, new SegmentRange(2000L, 4000L), true); merge2.setStatus(SegmentStatusEnum.NEW); merge2.setLastBuildJobID("test"); merge2.setStorageLocationIdentifier("test"); @@ -267,41 +268,41 @@ public class CubeManagerTest extends LocalFileMetadataTestCase { assertEquals(0, cube.getSegments().size()); // append first - CubeSegment seg1 = mgr.appendSegment(cube, 0, 1000); + CubeSegment seg1 = mgr.appendSegment(cube, new TSRange(0L, 1000L)); seg1.setStatus(SegmentStatusEnum.READY); - CubeSegment seg3 = mgr.appendSegment(cube, 2000, 4000); + CubeSegment seg3 = mgr.appendSegment(cube, new TSRange(2000L, 4000L)); seg3.setStatus(SegmentStatusEnum.READY); assertEquals(2, cube.getSegments().size()); - Pair<Long, Long> mergedSeg = mgr.autoMergeCubeSegments(cube); + SegmentRange mergedSeg = cube.autoMergeCubeSegments(); assertTrue(mergedSeg == null); // append a new seg which will be merged - CubeSegment seg4 = mgr.appendSegment(cube, 4000, 8000); + CubeSegment seg4 = mgr.appendSegment(cube, new TSRange(4000L, 8000L)); seg4.setStatus(SegmentStatusEnum.READY); assertEquals(3, cube.getSegments().size()); - mergedSeg = mgr.autoMergeCubeSegments(cube); + mergedSeg = cube.autoMergeCubeSegments(); assertTrue(mergedSeg != null); - assertTrue(mergedSeg.getFirst() == 2000 && mergedSeg.getSecond() == 8000); + assertTrue((Long) mergedSeg.start.v == 2000 && (Long) mergedSeg.end.v == 8000); // fill the gap - CubeSegment seg2 = mgr.appendSegment(cube, 1000, 2000); + CubeSegment seg2 = mgr.appendSegment(cube, new TSRange(1000L, 2000L)); seg2.setStatus(SegmentStatusEnum.READY); assertEquals(4, cube.getSegments().size()); - mergedSeg = mgr.autoMergeCubeSegments(cube); + mergedSeg = cube.autoMergeCubeSegments(); assertTrue(mergedSeg != null); - assertTrue(mergedSeg.getFirst() == 0 && mergedSeg.getSecond() == 8000); + assertTrue((Long) mergedSeg.start.v == 0 && (Long) mergedSeg.end.v == 8000); } public CubeDescManager getCubeDescManager() { http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java index de91dd2..64c6d68 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java @@ -25,6 +25,8 @@ import java.io.IOException; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.metadata.model.PartitionDesc; +import org.apache.kylin.metadata.model.SegmentRange; +import org.apache.kylin.metadata.model.SegmentRange.TSRange; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.junit.After; import org.junit.Before; @@ -52,10 +54,8 @@ public class CubeSegmentsTest extends LocalFileMetadataTestCase { // first append, creates a new & single segment CubeSegment seg = mgr.appendSegment(cube); - assertEquals(0, seg.getDateRangeStart()); - assertEquals(Long.MAX_VALUE, seg.getDateRangeEnd()); - assertEquals(0, seg.getSourceOffsetStart()); - assertEquals(Long.MAX_VALUE, seg.getSourceOffsetEnd()); + assertEquals(new TSRange(0L, Long.MAX_VALUE), seg.getTSRange()); + assertEquals(new TSRange(0L, Long.MAX_VALUE), seg.getSegRange()); assertEquals(1, cube.getSegments().size()); // second append, throw IllegalStateException because the first segment is not built @@ -82,15 +82,13 @@ public class CubeSegmentsTest extends LocalFileMetadataTestCase { // append again, for non-partitioned cube, it becomes a full refresh CubeSegment seg2 = mgr.appendSegment(cube); - assertEquals(0, seg2.getDateRangeStart()); - assertEquals(Long.MAX_VALUE, seg2.getDateRangeEnd()); - assertEquals(0, seg2.getSourceOffsetStart()); - assertEquals(Long.MAX_VALUE, seg2.getSourceOffsetEnd()); + assertEquals(new TSRange(0L, Long.MAX_VALUE), seg2.getTSRange()); + assertEquals(new TSRange(0L, Long.MAX_VALUE), seg2.getSegRange()); assertEquals(2, cube.getSegments().size()); // non-partitioned cannot merge, throw exception try { - mgr.mergeSegments(cube, 0, 0, 0, Long.MAX_VALUE, false); + mgr.mergeSegments(cube, null, new SegmentRange(0L, Long.MAX_VALUE), false); fail(); } catch (IllegalStateException ex) { // good @@ -106,28 +104,24 @@ public class CubeSegmentsTest extends LocalFileMetadataTestCase { assertEquals(0, cube.getSegments().size()); // append first - CubeSegment seg1 = mgr.appendSegment(cube, 0, 1000); + CubeSegment seg1 = mgr.appendSegment(cube, new TSRange(0L, 1000L)); seg1.setStatus(SegmentStatusEnum.READY); // append second - CubeSegment seg2 = mgr.appendSegment(cube, 1000, 2000); + CubeSegment seg2 = mgr.appendSegment(cube, new TSRange(1000L, 2000L)); assertEquals(2, cube.getSegments().size()); - assertEquals(1000, seg2.getDateRangeStart()); - assertEquals(2000, seg2.getDateRangeEnd()); - assertEquals(1000, seg2.getSourceOffsetStart()); - assertEquals(2000, seg2.getSourceOffsetEnd()); + assertEquals(new TSRange(1000L, 2000L), seg2.getTSRange()); + assertEquals(new TSRange(1000L, 2000L), seg2.getSegRange()); assertEquals(SegmentStatusEnum.NEW, seg2.getStatus()); seg2.setStatus(SegmentStatusEnum.READY); // merge first and second - CubeSegment merge = mgr.mergeSegments(cube, 0, 2000, 0, 0, true); + CubeSegment merge = mgr.mergeSegments(cube, new TSRange(0L, 2000L), null, true); assertEquals(3, cube.getSegments().size()); - assertEquals(0, merge.getDateRangeStart()); - assertEquals(2000, merge.getDateRangeEnd()); - assertEquals(0, merge.getSourceOffsetStart()); - assertEquals(2000, merge.getSourceOffsetEnd()); + assertEquals(new TSRange(0L, 2000L), merge.getTSRange()); + assertEquals(new TSRange(0L, 2000L), merge.getSegRange()); assertEquals(SegmentStatusEnum.NEW, merge.getStatus()); // segments are strictly ordered @@ -140,18 +134,16 @@ public class CubeSegmentsTest extends LocalFileMetadataTestCase { // try merge at start/end at middle of segments try { - mgr.mergeSegments(cube, 500, 2500, 0, 0, true); + mgr.mergeSegments(cube, new TSRange(500L, 2500L), null, true); fail(); } catch (IllegalArgumentException ex) { // good } - CubeSegment merge2 = mgr.mergeSegments(cube, 0, 2500, 0, 0, true); + CubeSegment merge2 = mgr.mergeSegments(cube, new TSRange(0L, 2500L), null, true); assertEquals(3, cube.getSegments().size()); - assertEquals(0, merge2.getDateRangeStart()); - assertEquals(2000, merge2.getDateRangeEnd()); - assertEquals(0, merge2.getSourceOffsetStart()); - assertEquals(2000, merge2.getSourceOffsetEnd()); + assertEquals(new TSRange(0L, 2000L), merge2.getTSRange()); + assertEquals(new TSRange(0L, 2000L), merge2.getSegRange()); } @Test @@ -164,25 +156,25 @@ public class CubeSegmentsTest extends LocalFileMetadataTestCase { assertEquals(0, cube.getSegments().size()); // append the first - CubeSegment seg1 = mgr.appendSegment(cube, 0, 1000); + CubeSegment seg1 = mgr.appendSegment(cube, new TSRange(0L, 1000L)); seg1.setStatus(SegmentStatusEnum.READY); assertEquals(1, cube.getSegments().size()); // append the third - CubeSegment seg3 = mgr.appendSegment(cube, 2000, 3000); + CubeSegment seg3 = mgr.appendSegment(cube, new TSRange(2000L, 3000L)); seg3.setStatus(SegmentStatusEnum.READY); assertEquals(2, cube.getSegments().size()); // reject overlap try { - mgr.appendSegment(cube, 1000, 2500); + mgr.appendSegment(cube, new TSRange(1000L, 2500L)); fail(); } catch (IllegalStateException ex) { // good } // append the second - CubeSegment seg2 = mgr.appendSegment(cube, 1000, 2000); + CubeSegment seg2 = mgr.appendSegment(cube, new TSRange(1000L, 2000L)); seg2.setStatus(SegmentStatusEnum.READY); assertEquals(3, cube.getSegments().size()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java index acb29e1..8dd5093 100644 --- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java +++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java @@ -33,6 +33,7 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.JoinDesc; import org.apache.kylin.metadata.model.JoinTableDesc; import org.apache.kylin.metadata.model.PartitionDesc; +import org.apache.kylin.metadata.model.SegmentRange; import org.apache.kylin.metadata.model.TableRef; import org.apache.kylin.metadata.model.TblColRef; @@ -216,12 +217,11 @@ public class JoinedFlatTable { if (flatDesc.getSegment() != null) { PartitionDesc partDesc = model.getPartitionDesc(); if (partDesc != null && partDesc.getPartitionDateColumn() != null) { - long dateStart = flatDesc.getSourceOffsetStart(); - long dateEnd = flatDesc.getSourceOffsetEnd(); + SegmentRange segRange = flatDesc.getSegRange(); - if (!(dateStart == 0 && dateEnd == Long.MAX_VALUE)) { + if (segRange != null && !segRange.isInfinite()) { whereBuilder.append(hasCondition ? " AND (" : " ("); - whereBuilder.append(partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc, dateStart, dateEnd)); + whereBuilder.append(partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc, segRange)); whereBuilder.append(")" + sep); hasCondition = true; } http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java index bd785c8..ea423ae 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java @@ -38,6 +38,7 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +@SuppressWarnings("serial") @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) public class ComputedColumnDesc implements Serializable { private static final Logger logger = LoggerFactory.getLogger(ComputedColumnDesc.class); http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java index b545e50..0589829 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java @@ -32,14 +32,13 @@ public interface IJoinedFlatTableDesc { int getColumnIndex(TblColRef colRef); - long getSourceOffsetStart(); - - long getSourceOffsetEnd(); + SegmentRange getSegRange(); TblColRef getDistributedBy(); TblColRef getClusterBy(); + // optionally present ISegment getSegment(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java index d46ea96..b8bee36 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java @@ -18,19 +18,20 @@ package org.apache.kylin.metadata.model; -public interface ISegment { +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.metadata.model.SegmentRange.TSRange; - public String getName(); - - public long getDateRangeStart(); - - public long getDateRangeEnd(); - - public boolean isSourceOffsetsOn(); +public interface ISegment extends Comparable<ISegment> { - public long getSourceOffsetStart(); + public KylinConfig getConfig(); + + public String getName(); - public long getSourceOffsetEnd(); + public boolean isOffsetCube(); + + public SegmentRange getSegRange(); + + public TSRange getTSRange(); public DataModelDesc getModel(); http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegmentAdvisor.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegmentAdvisor.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegmentAdvisor.java new file mode 100644 index 0000000..00337f4 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegmentAdvisor.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.metadata.model; + +import org.apache.kylin.metadata.model.SegmentRange.TSRange; + +public interface ISegmentAdvisor { + + boolean isOffsetCube(); + + SegmentRange getSegRange(); + + void setSegRange(SegmentRange range); + + TSRange getTSRange(); + + void setTSRange(TSRange range); +} + http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java index eb82ace..c8a5110 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java @@ -18,16 +18,17 @@ package org.apache.kylin.metadata.model; +import java.io.Serializable; +import java.util.Arrays; + import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import java.io.Serializable; -import java.util.Arrays; - /** */ +@SuppressWarnings("serial") @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) public class JoinDesc implements Serializable { http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java index deec4f2..ae4e4e6 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java @@ -29,6 +29,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; /** */ +@SuppressWarnings("serial") @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) public class MeasureDesc implements Serializable { http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java index 3c00149..cfa062b 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java @@ -18,6 +18,8 @@ package org.apache.kylin.metadata.model; +import java.io.Serializable; + import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.DateFormat; @@ -27,14 +29,13 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; import com.fasterxml.jackson.annotation.JsonProperty; -import java.io.Serializable; - /** */ +@SuppressWarnings("serial") @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) public class PartitionDesc implements Serializable { - public static enum PartitionType implements Serializable{ + public static enum PartitionType implements Serializable { APPEND, // UPDATE_INSERT // not used since 0.7.1 } @@ -80,7 +81,7 @@ public class PartitionDesc implements Serializable { public boolean partitionColumnIsYmdInt() { if (partitionDateColumnRef == null) return false; - + DataType type = partitionDateColumnRef.getType(); return (type.isInt() || type.isBigInt()) && DateFormat.isDatePattern(partitionDateFormat); } @@ -88,7 +89,7 @@ public class PartitionDesc implements Serializable { public boolean partitionColumnIsTimeMillis() { if (partitionDateColumnRef == null) return false; - + DataType type = partitionDateColumnRef.getType(); return type.isBigInt() && !DateFormat.isDatePattern(partitionDateFormat); } @@ -105,12 +106,12 @@ public class PartitionDesc implements Serializable { public void setPartitionDateColumn(String partitionDateColumn) { this.partitionDateColumn = partitionDateColumn; } - + // for test void setPartitionDateColumnRef(TblColRef partitionDateColumnRef) { this.partitionDateColumnRef = partitionDateColumnRef; } - + public String getPartitionTimeColumn() { return partitionTimeColumn; } @@ -124,7 +125,7 @@ public class PartitionDesc implements Serializable { void setPartitionTimeColumnRef(TblColRef partitionTimeColumnRef) { this.partitionTimeColumnRef = partitionTimeColumnRef; } - + @Deprecated public long getPartitionDateStart() { return partitionDateStart; @@ -170,37 +171,44 @@ public class PartitionDesc implements Serializable { public TblColRef getPartitionTimeColumnRef() { return partitionTimeColumnRef; } - + // ============================================================================ public static interface IPartitionConditionBuilder { - String buildDateRangeCondition(PartitionDesc partDesc, long startInclusive, long endExclusive); + String buildDateRangeCondition(PartitionDesc partDesc, SegmentRange segRange); } - public static class DefaultPartitionConditionBuilder implements IPartitionConditionBuilder, java.io.Serializable { + public static class DefaultPartitionConditionBuilder implements IPartitionConditionBuilder, Serializable { @Override - public String buildDateRangeCondition(PartitionDesc partDesc, long startInclusive, long endExclusive) { - StringBuilder builder = new StringBuilder(); + public String buildDateRangeCondition(PartitionDesc partDesc, SegmentRange segRange) { + long startInclusive = (Long) segRange.start.v; + long endExclusive = (Long) segRange.end.v; + TblColRef partitionDateColumn = partDesc.getPartitionDateColumnRef(); TblColRef partitionTimeColumn = partDesc.getPartitionTimeColumnRef(); + StringBuilder builder = new StringBuilder(); if (partDesc.partitionColumnIsYmdInt()) { buildSingleColumnRangeCondAsYmdInt(builder, partitionDateColumn, startInclusive, endExclusive); } else if (partDesc.partitionColumnIsTimeMillis()) { buildSingleColumnRangeCondAsTimeMillis(builder, partitionDateColumn, startInclusive, endExclusive); } else if (partitionDateColumn != null && partitionTimeColumn == null) { - buildSingleColumnRangeCondition(builder, partitionDateColumn, startInclusive, endExclusive, partDesc.getPartitionDateFormat()); + buildSingleColumnRangeCondition(builder, partitionDateColumn, startInclusive, endExclusive, + partDesc.getPartitionDateFormat()); } else if (partitionDateColumn == null && partitionTimeColumn != null) { - buildSingleColumnRangeCondition(builder, partitionTimeColumn, startInclusive, endExclusive, partDesc.getPartitionTimeFormat()); + buildSingleColumnRangeCondition(builder, partitionTimeColumn, startInclusive, endExclusive, + partDesc.getPartitionTimeFormat()); } else if (partitionDateColumn != null && partitionTimeColumn != null) { - buildMultipleColumnRangeCondition(builder, partitionDateColumn, partitionTimeColumn, startInclusive, endExclusive, partDesc.getPartitionDateFormat(), partDesc.getPartitionTimeFormat()); + buildMultipleColumnRangeCondition(builder, partitionDateColumn, partitionTimeColumn, startInclusive, + endExclusive, partDesc.getPartitionDateFormat(), partDesc.getPartitionTimeFormat()); } return builder.toString(); } - private static void buildSingleColumnRangeCondAsTimeMillis(StringBuilder builder, TblColRef partitionColumn, long startInclusive, long endExclusive) { + private static void buildSingleColumnRangeCondAsTimeMillis(StringBuilder builder, TblColRef partitionColumn, + long startInclusive, long endExclusive) { String partitionColumnName = partitionColumn.getIdentity(); if (startInclusive > 0) { builder.append(partitionColumnName + " >= " + startInclusive); @@ -209,35 +217,47 @@ public class PartitionDesc implements Serializable { builder.append(partitionColumnName + " < " + endExclusive); } - private static void buildSingleColumnRangeCondAsYmdInt(StringBuilder builder, TblColRef partitionColumn, long startInclusive, long endExclusive) { + private static void buildSingleColumnRangeCondAsYmdInt(StringBuilder builder, TblColRef partitionColumn, + long startInclusive, long endExclusive) { String partitionColumnName = partitionColumn.getIdentity(); if (startInclusive > 0) { - builder.append(partitionColumnName + " >= " + DateFormat.formatToDateStr(startInclusive, DateFormat.COMPACT_DATE_PATTERN)); + builder.append(partitionColumnName + " >= " + + DateFormat.formatToDateStr(startInclusive, DateFormat.COMPACT_DATE_PATTERN)); builder.append(" AND "); } - builder.append(partitionColumnName + " < " + DateFormat.formatToDateStr(endExclusive, DateFormat.COMPACT_DATE_PATTERN)); + builder.append(partitionColumnName + " < " + + DateFormat.formatToDateStr(endExclusive, DateFormat.COMPACT_DATE_PATTERN)); } - private static void buildSingleColumnRangeCondition(StringBuilder builder, TblColRef partitionColumn, long startInclusive, long endExclusive, String partitionColumnDateFormat) { + private static void buildSingleColumnRangeCondition(StringBuilder builder, TblColRef partitionColumn, + long startInclusive, long endExclusive, String partitionColumnDateFormat) { String partitionColumnName = partitionColumn.getIdentity(); if (startInclusive > 0) { - builder.append(partitionColumnName + " >= '" + DateFormat.formatToDateStr(startInclusive, partitionColumnDateFormat) + "'"); + builder.append(partitionColumnName + " >= '" + + DateFormat.formatToDateStr(startInclusive, partitionColumnDateFormat) + "'"); builder.append(" AND "); } - builder.append(partitionColumnName + " < '" + DateFormat.formatToDateStr(endExclusive, partitionColumnDateFormat) + "'"); + builder.append(partitionColumnName + " < '" + + DateFormat.formatToDateStr(endExclusive, partitionColumnDateFormat) + "'"); } - private static void buildMultipleColumnRangeCondition(StringBuilder builder, TblColRef partitionDateColumn, TblColRef partitionTimeColumn, long startInclusive, long endExclusive, String partitionColumnDateFormat, String partitionColumnTimeFormat) { + private static void buildMultipleColumnRangeCondition(StringBuilder builder, TblColRef partitionDateColumn, + TblColRef partitionTimeColumn, long startInclusive, long endExclusive, String partitionColumnDateFormat, + String partitionColumnTimeFormat) { String partitionDateColumnName = partitionDateColumn.getIdentity(); String partitionTimeColumnName = partitionTimeColumn.getIdentity(); if (startInclusive > 0) { builder.append("("); builder.append("("); - builder.append(partitionDateColumnName + " = '" + DateFormat.formatToDateStr(startInclusive, partitionColumnDateFormat) + "'").append(" AND ").append(partitionTimeColumnName + " >= '" + DateFormat.formatToDateStr(startInclusive, partitionColumnTimeFormat) + "'"); + builder.append(partitionDateColumnName + " = '" + + DateFormat.formatToDateStr(startInclusive, partitionColumnDateFormat) + "'").append(" AND ") + .append(partitionTimeColumnName + " >= '" + + DateFormat.formatToDateStr(startInclusive, partitionColumnTimeFormat) + "'"); builder.append(")"); builder.append(" OR "); builder.append("("); - builder.append(partitionDateColumnName + " > '" + DateFormat.formatToDateStr(startInclusive, partitionColumnDateFormat) + "'"); + builder.append(partitionDateColumnName + " > '" + + DateFormat.formatToDateStr(startInclusive, partitionColumnDateFormat) + "'"); builder.append(")"); builder.append(")"); builder.append(" AND "); @@ -245,11 +265,15 @@ public class PartitionDesc implements Serializable { builder.append("("); builder.append("("); - builder.append(partitionDateColumnName + " = '" + DateFormat.formatToDateStr(endExclusive, partitionColumnDateFormat) + "'").append(" AND ").append(partitionTimeColumnName + " < '" + DateFormat.formatToDateStr(endExclusive, partitionColumnTimeFormat) + "'"); + builder.append(partitionDateColumnName + " = '" + + DateFormat.formatToDateStr(endExclusive, partitionColumnDateFormat) + "'").append(" AND ") + .append(partitionTimeColumnName + " < '" + + DateFormat.formatToDateStr(endExclusive, partitionColumnTimeFormat) + "'"); builder.append(")"); builder.append(" OR "); builder.append("("); - builder.append(partitionDateColumnName + " < '" + DateFormat.formatToDateStr(endExclusive, partitionColumnDateFormat) + "'"); + builder.append(partitionDateColumnName + " < '" + + DateFormat.formatToDateStr(endExclusive, partitionColumnDateFormat) + "'"); builder.append(")"); builder.append(")"); } @@ -259,15 +283,18 @@ public class PartitionDesc implements Serializable { * Another implementation of IPartitionConditionBuilder, for the fact tables which have three partition columns "YEAR", "MONTH", and "DAY"; This * class will concat the three columns into yyyy-MM-dd format for query hive; */ - public static class YearMonthDayPartitionConditionBuilder implements PartitionDesc.IPartitionConditionBuilder { + public static class YearMonthDayPartitionConditionBuilder implements IPartitionConditionBuilder { @Override - public String buildDateRangeCondition(PartitionDesc partDesc, long startInclusive, long endExclusive) { + public String buildDateRangeCondition(PartitionDesc partDesc, SegmentRange segRange) { + long startInclusive = (Long) segRange.start.v; + long endExclusive = (Long) segRange.end.v; TblColRef partitionColumn = partDesc.getPartitionDateColumnRef(); String tableAlias = partitionColumn.getTableAlias(); - String concatField = String.format("CONCAT(%s.YEAR,'-',%s.MONTH,'-',%s.DAY)", tableAlias, tableAlias, tableAlias); + String concatField = String.format("CONCAT(%s.YEAR,'-',%s.MONTH,'-',%s.DAY)", tableAlias, tableAlias, + tableAlias); StringBuilder builder = new StringBuilder(); if (startInclusive > 0) {