KYLIN-2818 Refactor dateRange & sourceOffset on CubeSegment

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/75fbdcff
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/75fbdcff
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/75fbdcff

Branch: refs/heads/2622-2764
Commit: 75fbdcffb0366cb555783a229d1a514d010abd87
Parents: a54f016
Author: Li Yang <liy...@apache.org>
Authored: Fri Sep 1 15:18:36 2017 +0800
Committer: Hongbin Ma <m...@kyligence.io>
Committed: Sat Sep 2 18:47:28 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |   4 +
 .../org/apache/kylin/common/util/ClassUtil.java |   1 -
 .../main/resources/kylin-defaults.properties    |   1 +
 .../org/apache/kylin/cube/CubeInstance.java     |  12 +-
 .../java/org/apache/kylin/cube/CubeManager.java | 164 ++++++--------
 .../java/org/apache/kylin/cube/CubeSegment.java | 136 ++++++------
 .../apache/kylin/cube/CubeSegmentAdvisor.java   |  95 ++++++++
 .../org/apache/kylin/cube/CubeValidator.java    | 132 -----------
 .../cube/gridtable/SegmentGTStartAndEnd.java    |  11 +-
 .../org/apache/kylin/cube/model/CubeDesc.java   |   2 +-
 .../cube/model/CubeJoinedFlatTableDesc.java     |  14 +-
 .../cube/model/CubeJoinedFlatTableEnrich.java   |  18 +-
 .../org/apache/kylin/cube/CubeManagerTest.java  |  49 +++--
 .../org/apache/kylin/cube/CubeSegmentsTest.java |  52 ++---
 .../org/apache/kylin/job/JoinedFlatTable.java   |   8 +-
 .../metadata/model/ComputedColumnDesc.java      |   1 +
 .../metadata/model/IJoinedFlatTableDesc.java    |   5 +-
 .../apache/kylin/metadata/model/ISegment.java   |  21 +-
 .../kylin/metadata/model/ISegmentAdvisor.java   |  35 +++
 .../apache/kylin/metadata/model/JoinDesc.java   |   7 +-
 .../kylin/metadata/model/MeasureDesc.java       |   1 +
 .../kylin/metadata/model/PartitionDesc.java     |  89 +++++---
 .../kylin/metadata/model/SegmentRange.java      | 220 +++++++++++++++++++
 .../apache/kylin/metadata/model/Segments.java   | 166 +++++++++++---
 .../apache/kylin/metadata/model/TableRef.java   |   1 +
 .../apache/kylin/metadata/model/TblColRef.java  |   2 +-
 .../apache/kylin/source/SourcePartition.java    |  57 ++---
 .../DefaultPartitionConditionBuilderTest.java   |  20 +-
 .../kylin/storage/translate/HBaseKeyRange.java  |   2 +-
 .../mr/steps/UpdateCubeInfoAfterBuildStep.java  |   7 +-
 .../engine/mr/steps/MergeCuboidMapperTest.java  |   3 +-
 .../kylin/provision/BuildCubeWithEngine.java    |  14 +-
 .../kylin/provision/BuildCubeWithStream.java    |  19 +-
 .../.settings/org.eclipse.core.resources.prefs  |   1 -
 .../kylin/rest/controller/CubeController.java   |  40 ++--
 .../rest/controller2/CubeControllerV2.java      |  40 ++--
 .../apache/kylin/rest/job/HybridCubeCLI.java    |  16 +-
 .../apache/kylin/rest/service/CubeService.java  |  13 +-
 .../apache/kylin/rest/service/JobService.java   |  58 ++---
 .../apache/kylin/rest/service/ModelService.java |  10 +-
 .../rest/controller/CubeControllerTest.java     |  10 +-
 .../kylin/rest/service/CacheServiceTest.java    |   3 +-
 .../apache/kylin/source/hive/HiveSource.java    |   3 +-
 .../apache/kylin/source/jdbc/JdbcSource.java    |   3 +-
 .../apache/kylin/source/kafka/KafkaSource.java  |  22 +-
 .../kylin/source/kafka/job/MergeOffsetStep.java |  18 +-
 .../hbase/util/ExtendCubeToHybridCLI.java       |   6 +-
 .../kylin/tool/ExtendCubeToHybridCLI.java       |   6 +-
 48 files changed, 965 insertions(+), 653 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index e5fc3d6..9533400 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -335,6 +335,10 @@ abstract public class KylinConfigBase implements 
Serializable {
         return getOptional("kylin.cube.cuboid-scheduler", 
"org.apache.kylin.cube.cuboid.DefaultCuboidScheduler");
     }
 
+    public String getSegmentAdvisor() {
+        return getOptional("kylin.cube.segment-advisor", 
"org.apache.kylin.cube.CubeSegmentAdvisor");
+    }
+    
     public double getJobCuboidSizeRatio() {
         return 
Double.parseDouble(getOptional("kylin.cube.size-estimate-ratio", "0.25"));
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-common/src/main/java/org/apache/kylin/common/util/ClassUtil.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/util/ClassUtil.java 
b/core-common/src/main/java/org/apache/kylin/common/util/ClassUtil.java
index 4486528..a54a24d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/ClassUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ClassUtil.java
@@ -66,7 +66,6 @@ public class ClassUtil {
         
classRenameMap.put("org.apache.kylin.rest.util.KeywordDefaultDirtyHack", 
"org.apache.kylin.query.util.KeywordDefaultDirtyHack");
     }
 
-    @SuppressWarnings("unchecked")
     public static <T> Class<? extends T> forName(String name, Class<T> clz) 
throws ClassNotFoundException {
         name = forRenamedClass(name);
         return (Class<? extends T>) Class.forName(name);

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-common/src/main/resources/kylin-defaults.properties
----------------------------------------------------------------------
diff --git a/core-common/src/main/resources/kylin-defaults.properties 
b/core-common/src/main/resources/kylin-defaults.properties
index 7c421f9..f0328fa 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -146,6 +146,7 @@ kylin.engine.mr.uhc-reducer-count=1
 ### CUBE | DICTIONARY ###
 
 kylin.cube.cuboid-scheduler=org.apache.kylin.cube.cuboid.DefaultCuboidScheduler
+kylin.cube.segment-advisor=org.apache.kylin.cube.CubeSegmentAdvisor
 
 # 'auto', 'inmem', 'layer' or 'random' for testing 
 kylin.cube.algorithm=layer

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index 4c57db8..fad942c 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -26,13 +26,13 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigExt;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.JoinTableDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.Segments;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -99,11 +99,11 @@ public class CubeInstance extends RootPersistentEntity 
implements IRealization,
     public CubeInstance() {
     }
 
-    public List<CubeSegment> getBuildingSegments() {
+    public Segments<CubeSegment> getBuildingSegments() {
         return segments.getBuildingSegments();
     }
 
-    public List<CubeSegment> getMergingSegments(CubeSegment mergedSegment) {
+    public Segments<CubeSegment> getMergingSegments(CubeSegment mergedSegment) 
{
         return segments.getMergingSegments(mergedSegment);
     }
 
@@ -337,12 +337,12 @@ public class CubeInstance extends RootPersistentEntity 
implements IRealization,
 
     @Override
     public long getDateRangeStart() {
-        return segments.getDateRangeStart();
+        return segments.getTSStart();
     }
 
     @Override
     public long getDateRangeEnd() {
-        return segments.getDateRangeEnd();
+        return segments.getTSEnd();
     }
 
     @Override
@@ -366,7 +366,7 @@ public class CubeInstance extends RootPersistentEntity 
implements IRealization,
         return this.getDescriptor().getAutoMergeTimeRanges() != null && 
this.getDescriptor().getAutoMergeTimeRanges().length > 0;
     }
 
-    public Pair<Long, Long> autoMergeCubeSegments() throws IOException {
+    public SegmentRange autoMergeCubeSegments() throws IOException {
         return segments.autoMergeCubeSegments(needAutoMerge(), getName(), 
getDescriptor().getAutoMergeTimeRanges());
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index e089198..043993c 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -55,6 +55,8 @@ import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
 import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
 import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.Segments;
 import org.apache.kylin.metadata.model.TableDesc;
@@ -404,7 +406,7 @@ public class CubeManager implements IRealizationProvider {
         }
 
         Collections.sort(newSegs);
-        CubeValidator.validate(newSegs);
+        newSegs.validate();
         cube.setSegments(newSegs);
 
         if (update.getStatus() != null) {
@@ -454,39 +456,38 @@ public class CubeManager implements IRealizationProvider {
 
     // append a full build segment
     public CubeSegment appendSegment(CubeInstance cube) throws IOException {
-        return appendSegment(cube, 0, Long.MAX_VALUE, 0, 0, null, null);
+        return appendSegment(cube, null, null, null, null);
     }
 
-    public CubeSegment appendSegment(CubeInstance cube, long startDate, long 
endDate) throws IOException {
-        return appendSegment(cube, startDate, endDate, 0, 0, null, null);
+    public CubeSegment appendSegment(CubeInstance cube, TSRange tsRange) 
throws IOException {
+        return appendSegment(cube, tsRange, null, null, null);
     }
 
-    public CubeSegment appendSegment(CubeInstance cube, SourcePartition 
sourcePartition) throws IOException {
-        return appendSegment(cube, sourcePartition.getStartDate(), 
sourcePartition.getEndDate(),
-                sourcePartition.getStartOffset(), 
sourcePartition.getEndOffset(),
-                sourcePartition.getSourcePartitionOffsetStart(), 
sourcePartition.getSourcePartitionOffsetEnd());
+    public CubeSegment appendSegment(CubeInstance cube, SourcePartition src) 
throws IOException {
+        return appendSegment(cube, src.getTSRange(), src.getSegRange(), 
src.getSourcePartitionOffsetStart(),
+                src.getSourcePartitionOffsetEnd());
     }
 
-    CubeSegment appendSegment(CubeInstance cube, long startDate, long endDate, 
long startOffset, long endOffset,
+    CubeSegment appendSegment(CubeInstance cube, TSRange tsRange, SegmentRange 
segRange,
             Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> 
sourcePartitionOffsetEnd)
             throws IOException {
+        checkInputRanges(tsRange, segRange);
         checkBuildingSegment(cube);
 
         // fix start/end a bit
         if (cube.getModel().getPartitionDesc().isPartitioned()) {
             // if missing start, set it to where last time ends
-            if (startDate == 0 && startOffset == 0 && cube.getLastSegment() != 
null) {
-                CubeSegment last = cube.getLastSegment();
-                startDate = last.isSourceOffsetsOn() ? 0 : 
last.getDateRangeEnd();
-                startOffset = last.isSourceOffsetsOn() ? 
last.getSourceOffsetEnd() : 0;
+            CubeSegment last = cube.getLastSegment();
+            if (last != null && !last.isOffsetCube() && tsRange.start.v == 0) {
+                tsRange = new TSRange(last.getTSRange().end.v, tsRange.end.v);
             }
         } else {
             // full build
-            startDate = startOffset = endOffset = 0;
-            endDate = Long.MAX_VALUE;
+            tsRange = null;
+            segRange = null;
         }
 
-        CubeSegment newSegment = newSegment(cube, startDate, endDate, 
startOffset, endOffset);
+        CubeSegment newSegment = newSegment(cube, tsRange, segRange);
         newSegment.setSourcePartitionOffsetStart(sourcePartitionOffsetStart);
         newSegment.setSourcePartitionOffsetEnd(sourcePartitionOffsetEnd);
         validateNewSegments(cube, newSegment);
@@ -497,22 +498,22 @@ public class CubeManager implements IRealizationProvider {
         return newSegment;
     }
 
-    public CubeSegment refreshSegment(CubeInstance cube, long startDate, long 
endDate, long startOffset, long endOffset)
+    public CubeSegment refreshSegment(CubeInstance cube, TSRange tsRange, 
SegmentRange segRange)
             throws IOException {
+        checkInputRanges(tsRange, segRange);
         checkBuildingSegment(cube);
 
-        CubeSegment newSegment = newSegment(cube, startDate, endDate, 
startOffset, endOffset);
+        CubeSegment newSegment = newSegment(cube, tsRange, segRange);
 
-        Pair<Boolean, Boolean> pair = 
CubeValidator.fitInSegments(cube.getSegments(), newSegment);
+        Pair<Boolean, Boolean> pair = 
cube.getSegments().fitInSegments(newSegment);
         if (pair.getFirst() == false || pair.getSecond() == false)
             throw new IllegalArgumentException("The new refreshing segment " + 
newSegment
                     + " does not match any existing segment in cube " + cube);
 
-        if (startOffset > 0 || endOffset > 0) {
+        if (segRange != null) {
             CubeSegment toRefreshSeg = null;
             for (CubeSegment cubeSegment : cube.getSegments()) {
-                if (cubeSegment.getSourceOffsetStart() == startOffset
-                        && cubeSegment.getSourceOffsetEnd() == endOffset) {
+                if (cubeSegment.getSegRange().equals(segRange)) {
                     toRefreshSeg = cubeSegment;
                     break;
                 }
@@ -533,61 +534,48 @@ public class CubeManager implements IRealizationProvider {
         return newSegment;
     }
 
-    public CubeSegment mergeSegments(CubeInstance cube, long startDate, long 
endDate, long startOffset, long endOffset,
-            boolean force) throws IOException {
+    public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, 
SegmentRange segRange, boolean force)
+            throws IOException {
         if (cube.getSegments().isEmpty())
             throw new IllegalArgumentException("Cube " + cube + " has no 
segments");
-        if (startDate >= endDate && startOffset >= endOffset)
-            throw new IllegalArgumentException("Invalid merge range");
 
+        checkInputRanges(tsRange, segRange);
         checkBuildingSegment(cube);
         checkCubeIsPartitioned(cube);
 
-        boolean isOffsetsOn = cube.getSegments().get(0).isSourceOffsetsOn();
-
-        if (isOffsetsOn) {
+        if (cube.getSegments().getFirstSegment().isOffsetCube()) {
             // offset cube, merge by date range?
-            if (startOffset == endOffset) {
+            if (segRange == null && tsRange != null) {
                 Pair<CubeSegment, CubeSegment> pair = 
cube.getSegments(SegmentStatusEnum.READY)
-                        .findMergeOffsetsByDateRange(startDate, endDate, 
Long.MAX_VALUE);
+                        .findMergeOffsetsByDateRange(tsRange, Long.MAX_VALUE);
                 if (pair == null)
-                    throw new IllegalArgumentException("Find no segments to 
merge by date range " + startDate + "-"
-                            + endDate + " for cube " + cube);
-                startOffset = pair.getFirst().getSourceOffsetStart();
-                endOffset = pair.getSecond().getSourceOffsetEnd();
+                    throw new IllegalArgumentException("Find no segments to 
merge by " + tsRange + " for cube " + cube);
+                segRange = new 
SegmentRange(pair.getFirst().getSegRange().start, 
pair.getSecond().getSegRange().end);
             }
-            startDate = 0;
-            endDate = 0;
+            tsRange = null;
+            Preconditions.checkArgument(segRange != null);
         } else {
-            // date range cube, make sure range is on dates
-            if (startDate == endDate) {
-                startDate = startOffset;
-                endDate = endOffset;
-            }
-            startOffset = 0;
-            endOffset = 0;
+            segRange = null;
+            Preconditions.checkArgument(tsRange != null);
         }
 
-        CubeSegment newSegment = newSegment(cube, startDate, endDate, 
startOffset, endOffset);
+        CubeSegment newSegment = newSegment(cube, tsRange, segRange);
 
-        List<CubeSegment> mergingSegments = 
cube.getMergingSegments(newSegment);
+        Segments<CubeSegment> mergingSegments = 
cube.getMergingSegments(newSegment);
         if (mergingSegments.size() <= 1)
-            throw new IllegalArgumentException(
-                    "Range " + newSegment.getSourceOffsetStart() + "-" + 
newSegment.getSourceOffsetEnd()
-                            + " must contain at least 2 segments, but there is 
" + mergingSegments.size());
+            throw new IllegalArgumentException("Range " + 
newSegment.getSegRange()
+                    + " must contain at least 2 segments, but there is " + 
mergingSegments.size());
 
         CubeSegment first = mergingSegments.get(0);
         CubeSegment last = mergingSegments.get(mergingSegments.size() - 1);
-        if (newSegment.isSourceOffsetsOn()) {
-            newSegment.setDateRangeStart(minDateRangeStart(mergingSegments));
-            newSegment.setDateRangeEnd(maxDateRangeEnd(mergingSegments));
-            newSegment.setSourceOffsetStart(first.getSourceOffsetStart());
-            newSegment.setSourceOffsetEnd(last.getSourceOffsetEnd());
+        if (first.isOffsetCube()) {
+            newSegment.setSegRange(new SegmentRange(first.getSegRange().start, 
last.getSegRange().end));
             
newSegment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
             
newSegment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd());
+            newSegment.setTSRange(null);
         } else {
-            newSegment.setDateRangeStart(first.getSourceOffsetStart());
-            newSegment.setDateRangeEnd(last.getSourceOffsetEnd());
+            newSegment.setTSRange(new TSRange(mergingSegments.getTSStart(), 
mergingSegments.getTSEnd()));
+            newSegment.setSegRange(null);
         }
 
         if (force == false) {
@@ -613,27 +601,10 @@ public class CubeManager implements IRealizationProvider {
 
         return newSegment;
     }
-
-    public static long minDateRangeStart(List<CubeSegment> mergingSegments) {
-        long min = Long.MAX_VALUE;
-        for (CubeSegment seg : mergingSegments)
-            min = Math.min(min, seg.getDateRangeStart());
-        return min;
-    }
-
-    public static long maxDateRangeEnd(List<CubeSegment> mergingSegments) {
-        long max = Long.MIN_VALUE;
-        for (CubeSegment seg : mergingSegments)
-            max = Math.max(max, seg.getDateRangeEnd());
-        return max;
-    }
-
-    public CubeSegment getLatestSegment(CubeInstance cube) {
-        List<CubeSegment> existing = cube.getSegments();
-        if (existing.isEmpty()) {
-            return null;
-        } else {
-            return existing.get(existing.size() - 1);
+    
+    private void checkInputRanges(TSRange tsRange, SegmentRange segRange) {
+        if (tsRange != null && segRange != null) {
+            throw new IllegalArgumentException("Build or refresh cube segment 
either by TSRange or by SegmentRange, not both.");
         }
     }
 
@@ -690,15 +661,19 @@ public class CubeManager implements IRealizationProvider {
         }
     }
 
-    private CubeSegment newSegment(CubeInstance cube, long startDate, long 
endDate, long startOffset, long endOffset) {
+    private CubeSegment newSegment(CubeInstance cube, TSRange tsRange, 
SegmentRange segRange) {
         CubeSegment segment = new CubeSegment();
         segment.setUuid(UUID.randomUUID().toString());
-        segment.setName(CubeSegment.makeSegmentName(startDate, endDate, 
startOffset, endOffset));
+        segment.setName(CubeSegment.makeSegmentName(tsRange, segRange));
         segment.setCreateTimeUTC(System.currentTimeMillis());
-        segment.setDateRangeStart(startDate);
-        segment.setDateRangeEnd(endDate);
-        segment.setSourceOffsetStart(startOffset);
-        segment.setSourceOffsetEnd(endOffset);
+        segment.setCubeInstance(cube);
+        
+        // let full build range be backward compatible
+        if (tsRange == null && segRange == null)
+            tsRange = new TSRange(0L, Long.MAX_VALUE);
+        
+        segment.setTSRange(tsRange);
+        segment.setSegRange(segRange);
         segment.setStatus(SegmentStatusEnum.NEW);
         segment.setStorageLocationIdentifier(generateStorageLocation());
 
@@ -724,10 +699,6 @@ public class CubeManager implements IRealizationProvider {
         return tableName;
     }
 
-    public Pair<Long, Long> autoMergeCubeSegments(CubeInstance cube) throws 
IOException {
-        return cube.autoMergeCubeSegments();
-    }
-
     public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment 
newSegment) throws IOException {
         if (StringUtils.isBlank(newSegment.getStorageLocationIdentifier()))
             throw new IllegalStateException(
@@ -923,25 +894,24 @@ public class CubeManager implements IRealizationProvider {
         }
 
         Collections.sort(segments);
-        boolean isOffsetOn = segments.get(0).isSourceOffsetsOn();
         for (int i = 0; i < segments.size() - 1; ++i) {
             CubeSegment first = segments.get(i);
             CubeSegment second = segments.get(i + 1);
-            if (first.getSourceOffsetEnd() == second.getSourceOffsetStart()) {
+            if (first.getSegRange().connects(second.getSegRange()))
                 continue;
-            } else if (first.getSourceOffsetEnd() < 
second.getSourceOffsetStart()) {
+            
+            if (first.getSegRange().apartBefore(second.getSegRange())) {
                 CubeSegment hole = new CubeSegment();
-                if (isOffsetOn == true) {
-                    hole.setSourceOffsetStart(first.getSourceOffsetEnd());
+                hole.setCubeInstance(cube);
+                if (first.isOffsetCube()) {
+                    hole.setSegRange(new SegmentRange(first.getSegRange().end, 
second.getSegRange().start));
                     
hole.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetEnd());
-                    hole.setSourceOffsetEnd(second.getSourceOffsetStart());
                     
hole.setSourcePartitionOffsetEnd(second.getSourcePartitionOffsetStart());
+                    hole.setName(CubeSegment.makeSegmentName(null, 
hole.getSegRange()));
                 } else {
-                    hole.setDateRangeStart(first.getDateRangeEnd());
-                    hole.setDateRangeEnd(second.getDateRangeStart());
+                    hole.setTSRange(new TSRange(first.getTSRange().end.v, 
second.getTSRange().start.v));
+                    
hole.setName(CubeSegment.makeSegmentName(hole.getTSRange(), null));
                 }
-                
hole.setName(CubeSegment.makeSegmentName(hole.getDateRangeStart(), 
hole.getDateRangeEnd(),
-                        hole.getSourceOffsetStart(), 
hole.getSourceOffsetEnd()));
                 holes.add(hole);
             }
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index bb55c1a..358183e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.cube;
 
+import java.io.Serializable;
 import java.text.SimpleDateFormat;
 import java.util.Collection;
 import java.util.LinkedHashMap;
@@ -37,6 +38,9 @@ import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.metadata.model.ISegmentAdvisor;
+import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.Segments;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -50,8 +54,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+@SuppressWarnings("serial")
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = 
Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = 
Visibility.NONE)
-public class CubeSegment implements Comparable<CubeSegment>, IBuildable, 
ISegment, java.io.Serializable {
+public class CubeSegment implements IBuildable, ISegment, Serializable {
 
     @JsonBackReference
     private CubeInstance cubeInstance;
@@ -114,32 +119,27 @@ public class CubeSegment implements 
Comparable<CubeSegment>, IBuildable, ISegmen
     private Map<String, String> additionalInfo = new LinkedHashMap<String, 
String>();
 
     private Map<Long, Short> cuboidBaseShards = Maps.newConcurrentMap(); // 
cuboid id ==> base(starting) shard for this cuboid
+    
+    // lazy init
+    transient ISegmentAdvisor advisor = null;
 
     public CubeDesc getCubeDesc() {
         return getCubeInstance().getDescriptor();
     }
 
-    /**
-     * @param startDate
-     * @param endDate
-     * @return if(startDate == 0 && endDate == 0), returns "FULL_BUILD", else
-     * returns "yyyyMMddHHmmss_yyyyMMddHHmmss"
-     */
-    public static String makeSegmentName(long startDate, long endDate, long 
startOffset, long endOffset) {
-        if (startOffset == 0 && startDate == 0 //
-                && (endOffset == 0 || endOffset == Long.MAX_VALUE) //
-                && (endDate == 0 || endDate == Long.MAX_VALUE)) {
+    public static String makeSegmentName(TSRange tsRange, SegmentRange 
segRange) {
+        if (tsRange == null && segRange == null) {
             return "FULL_BUILD";
         }
 
-        if (startOffset != 0 || endOffset != 0) {
-            return startOffset + "_" + endOffset;
+        if (segRange != null) {
+            return segRange.start.v + "_" + segRange.end.v;
         }
 
         // using time
         SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
         dateFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
-        return dateFormat.format(startDate) + "_" + dateFormat.format(endDate);
+        return dateFormat.format(tsRange.start.v) + "_" + 
dateFormat.format(tsRange.end.v);
     }
 
     // 
============================================================================
@@ -164,22 +164,6 @@ public class CubeSegment implements 
Comparable<CubeSegment>, IBuildable, ISegmen
         this.name = name;
     }
 
-    public long getDateRangeStart() {
-        return dateRangeStart;
-    }
-
-    public void setDateRangeStart(long dateRangeStart) {
-        this.dateRangeStart = dateRangeStart;
-    }
-
-    public long getDateRangeEnd() {
-        return dateRangeEnd;
-    }
-
-    public void setDateRangeEnd(long dateRangeEnd) {
-        this.dateRangeEnd = dateRangeEnd;
-    }
-
     public SegmentStatusEnum getStatus() {
         return status;
     }
@@ -254,7 +238,7 @@ public class CubeSegment implements 
Comparable<CubeSegment>, IBuildable, ISegmen
         return cubeInstance;
     }
 
-    public void setCubeInstance(CubeInstance cubeInstance) {
+    void setCubeInstance(CubeInstance cubeInstance) {
         this.cubeInstance = cubeInstance;
     }
 
@@ -335,63 +319,89 @@ public class CubeSegment implements 
Comparable<CubeSegment>, IBuildable, ISegmen
         return new CubeDimEncMap(this);
     }
 
-    public boolean isSourceOffsetsOn() {
-        return sourceOffsetStart != 0 || sourceOffsetEnd != 0;
+    // Hide the 4 confusing fields: dateRangeStart, dateRangeEnd, 
sourceOffsetStart, sourceOffsetEnd.
+    // They are now managed via SegmentRange and TSRange.
+    long _getDateRangeStart() {
+        return dateRangeStart;
     }
 
-    // date range is used in place of source offsets when offsets are missing
-    public long getSourceOffsetStart() {
-        return isSourceOffsetsOn() ? sourceOffsetStart : dateRangeStart;
+    void _setDateRangeStart(long dateRangeStart) {
+        this.dateRangeStart = dateRangeStart;
     }
 
-    public void setSourceOffsetStart(long sourceOffsetStart) {
-        this.sourceOffsetStart = sourceOffsetStart;
+    long _getDateRangeEnd() {
+        return dateRangeEnd;
     }
 
-    // date range is used in place of source offsets when offsets are missing
-    public long getSourceOffsetEnd() {
-        return isSourceOffsetsOn() ? sourceOffsetEnd : dateRangeEnd;
+    void _setDateRangeEnd(long dateRangeEnd) {
+        this.dateRangeEnd = dateRangeEnd;
     }
 
-    public void setSourceOffsetEnd(long sourceOffsetEnd) {
-        this.sourceOffsetEnd = sourceOffsetEnd;
+    long _getSourceOffsetStart() {
+        return sourceOffsetStart;
     }
 
-    // date range is used in place of source offsets when offsets are missing
-    public boolean sourceOffsetOverlaps(CubeSegment seg) {
-        return Segments.sourceOffsetOverlaps(this, seg);
+    void _setSourceOffsetStart(long sourceOffsetStart) {
+        this.sourceOffsetStart = sourceOffsetStart;
     }
 
-    // date range is used in place of source offsets when offsets are missing
-    public boolean sourceOffsetContains(ISegment seg) {
-        return Segments.sourceOffsetContains(this, seg);
+    long _getSourceOffsetEnd() {
+        return sourceOffsetEnd;
     }
 
+    void _setSourceOffsetEnd(long sourceOffsetEnd) {
+        this.sourceOffsetEnd = sourceOffsetEnd;
+    }
+    
     @Override
-    public void validate() throws IllegalStateException {
-        if 
(cubeInstance.getDescriptor().getModel().getPartitionDesc().isPartitioned()) {
-            if (!isSourceOffsetsOn() && dateRangeStart >= dateRangeEnd)
-                throw new IllegalStateException("Invalid segment, 
dateRangeStart(" + dateRangeStart + ") must be smaller than dateRangeEnd(" + 
dateRangeEnd + ") in segment " + this);
-            if (isSourceOffsetsOn() && sourceOffsetStart >= sourceOffsetEnd)
-                throw new IllegalStateException("Invalid segment, 
sourceOffsetStart(" + sourceOffsetStart + ") must be smaller than 
sourceOffsetEnd(" + sourceOffsetEnd + ") in segment " + this);
+    public SegmentRange getSegRange() {
+        return getAdvisor().getSegRange();
+    }
+    
+    public void setSegRange(SegmentRange range) {
+        getAdvisor().setSegRange(range);
+    }
+
+    @Override
+    public TSRange getTSRange() {
+        return getAdvisor().getTSRange();
+    }
+    
+    public void setTSRange(TSRange range) {
+        getAdvisor().setTSRange(range);
+    }
+    
+    public boolean isOffsetCube() {
+        return getAdvisor().isOffsetCube();
+    }
+    
+    private ISegmentAdvisor getAdvisor() {
+        if (advisor != null)
+            return advisor;
+        
+        synchronized (this) {
+            if (advisor == null) {
+                advisor = Segments.newSegmentAdvisor(this);
+            }
+            return advisor;
         }
     }
     
+    @Override
+    public void validate() throws IllegalStateException {
+    }
+    
     public String getProject() {
         return getCubeDesc().getProject();
     }
 
     @Override
-    public int compareTo(CubeSegment other) {
-        long comp = this.getSourceOffsetStart() - other.getSourceOffsetStart();
+    public int compareTo(ISegment other) {
+        int comp = 
this.getSegRange().start.compareTo(other.getSegRange().start);
         if (comp != 0)
-            return comp < 0 ? -1 : 1;
+            return comp;
 
-        comp = this.getSourceOffsetEnd() - other.getSourceOffsetEnd();
-        if (comp != 0)
-            return comp < 0 ? -1 : 1;
-        else
-            return 0;
+        return this.getSegRange().end.compareTo(other.getSegRange().end);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-cube/src/main/java/org/apache/kylin/cube/CubeSegmentAdvisor.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegmentAdvisor.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegmentAdvisor.java
new file mode 100644
index 0000000..584cf4e
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegmentAdvisor.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube;
+
+import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.metadata.model.ISegmentAdvisor;
+import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
+
+public class CubeSegmentAdvisor implements ISegmentAdvisor {
+
+    private final CubeSegment seg;
+
+    // these are just cache of segment attributes, all changes must write 
through to 'seg'
+    private TSRange tsRange;
+    private SegmentRange segRange;
+
+    public CubeSegmentAdvisor(ISegment segment) {
+        this.seg = (CubeSegment) segment;
+    }
+    
+    @Override
+    public boolean isOffsetCube() {
+        return seg._getSourceOffsetStart() != 0 || seg._getSourceOffsetEnd() 
!= 0;
+    }
+
+    @Override
+    public SegmentRange getSegRange() {
+        if (segRange != null)
+            return segRange;
+
+        // backward compatible with pre-streaming metadata, TSRange can imply 
SegmentRange
+        segRange = isOffsetCube() //
+                ? new SegmentRange(seg._getSourceOffsetStart(), 
seg._getSourceOffsetEnd()) //
+                : getTSRange();
+
+        return segRange;
+    }
+
+    @Override
+    public void setSegRange(SegmentRange range) {
+        // backward compatible with pre-streaming metadata, TSRange can imply 
SegmentRange
+        if (range == null) {
+            seg._setSourceOffsetStart(0);
+            seg._setSourceOffsetEnd(0);
+        } else {
+            seg._setSourceOffsetStart((Long) range.start.v);
+            seg._setSourceOffsetEnd((Long) range.end.v);
+        }
+        clear();
+    }
+
+    @Override
+    public TSRange getTSRange() {
+        if (tsRange != null)
+            return tsRange;
+        
+        tsRange = new TSRange(seg._getDateRangeStart(), 
seg._getDateRangeEnd());
+        return tsRange;
+    }
+
+    @Override
+    public void setTSRange(TSRange range) {
+        if (range == null) {
+            seg._setDateRangeStart(0);
+            seg._setDateRangeEnd(0);
+        } else {
+            seg._setDateRangeStart(range.start.v);
+            seg._setDateRangeEnd(range.end.v);
+        }
+        clear();
+    }
+
+    private void clear() {
+        tsRange = null;
+        segRange = null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-cube/src/main/java/org/apache/kylin/cube/CubeValidator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeValidator.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeValidator.java
deleted file mode 100644
index f94752f..0000000
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeValidator.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.cube;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-public class CubeValidator {
-    private static final Logger logger = 
LoggerFactory.getLogger(CubeValidator.class);
-
-    /**
-     * Validates:
-     * - consistent isOffsetsOn()
-     * - for all ready segments, sourceOffset MUST have no overlaps, SHOULD 
have no holes
-     * - for all new segments, sourceOffset MUST have no overlaps, MUST 
contain a ready segment if overlaps with it
-     * - for all new segments, sourceOffset SHOULD fit/connect another segments
-     * - dateRange does not matter any more
-     */
-    public static void validate(Collection<CubeSegment> segments) {
-        if (segments == null || segments.isEmpty())
-            return;
-
-        // make a copy, don't modify given list
-        List<CubeSegment> all = Lists.newArrayList(segments);
-        Collections.sort(all);
-
-        // check consistent isOffsetsOn()
-        boolean isOffsetsOn = all.get(0).isSourceOffsetsOn();
-        for (CubeSegment seg : all) {
-            seg.validate();
-            if (seg.isSourceOffsetsOn() != isOffsetsOn)
-                throw new IllegalStateException("Inconsistent isOffsetsOn for 
segment " + seg);
-        }
-
-        List<CubeSegment> ready = Lists.newArrayListWithCapacity(all.size());
-        List<CubeSegment> news = Lists.newArrayListWithCapacity(all.size());
-        for (CubeSegment seg : all) {
-            if (seg.getStatus() == SegmentStatusEnum.READY)
-                ready.add(seg);
-            else
-                news.add(seg);
-        }
-
-        // for all ready segments, sourceOffset MUST have no overlaps, SHOULD 
have no holes
-        CubeSegment pre = null;
-        for (CubeSegment seg : ready) {
-            if (pre != null) {
-                if (pre.sourceOffsetOverlaps(seg))
-                    throw new IllegalStateException("Segments overlap: " + pre 
+ " and " + seg);
-                if (pre.getSourceOffsetEnd() < seg.getSourceOffsetStart())
-                    logger.warn("Hole between adjacent READY segments " + pre 
+ " and " + seg);
-            }
-            pre = seg;
-        }
-
-        // for all other segments, sourceOffset MUST have no overlaps, MUST 
contain a ready segment if overlaps with it
-        pre = null;
-        for (CubeSegment seg : news) {
-            if (pre != null) {
-                if (pre.sourceOffsetOverlaps(seg))
-                    throw new IllegalStateException("Segments overlap: " + pre 
+ " and " + seg);
-            }
-            pre = seg;
-
-            for (CubeSegment aReady : ready) {
-                if (seg.sourceOffsetOverlaps(aReady) && 
!seg.sourceOffsetContains(aReady))
-                    throw new IllegalStateException("Segments overlap: " + 
aReady + " and " + seg);
-            }
-        }
-
-        // for all other segments, sourceOffset SHOULD fit/connect other 
segments
-        for (CubeSegment seg : news) {
-            Pair<Boolean, Boolean> pair = fitInSegments(all, seg);
-            boolean startFit = pair.getFirst();
-            boolean endFit = pair.getSecond();
-
-            if (!startFit)
-                logger.warn("NEW segment start does not fit/connect with other 
segments: " + seg);
-            if (!endFit)
-                logger.warn("NEW segment end does not fit/connect with other 
segments: " + seg);
-        }
-    }
-
-    public static Pair<Boolean, Boolean> fitInSegments(List<CubeSegment> 
segments, CubeSegment newOne) {
-        if (segments == null || segments.isEmpty())
-            return null;
-
-        CubeSegment first = segments.get(0);
-        CubeSegment last = segments.get(segments.size() - 1);
-        long start = newOne.getSourceOffsetStart();
-        long end = newOne.getSourceOffsetEnd();
-        boolean startFit = false;
-        boolean endFit = false;
-        for (CubeSegment sss : segments) {
-            if (sss == newOne)
-                continue;
-            startFit = startFit || (start == sss.getSourceOffsetStart() || 
start == sss.getSourceOffsetEnd());
-            endFit = endFit || (end == sss.getSourceOffsetStart() || end == 
sss.getSourceOffsetEnd());
-        }
-        if (!startFit && endFit && newOne == first)
-            startFit = true;
-        if (!endFit && startFit && newOne == last)
-            endFit = true;
-
-        return Pair.newPair(startFit, endFit);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java
index 8ceb841..7ecbbd6 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java
@@ -28,6 +28,7 @@ import org.apache.kylin.dimension.AbstractDateDimEnc;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
 
 public class SegmentGTStartAndEnd {
     private ISegment segment;
@@ -43,16 +44,18 @@ public class SegmentGTStartAndEnd {
     }
 
     public Pair<ByteArray, ByteArray> getSegmentStartAndEnd(int index) {
+        TSRange tsRange = segment.getTSRange();
+        
         ByteArray start;
-        if (segment.getDateRangeStart() != Long.MIN_VALUE) {
-            start = encodeTime(segment.getDateRangeStart(), index, 1);
+        if (!tsRange.start.isMin) {
+            start = encodeTime(tsRange.start.v, index, 1);
         } else {
             start = new ByteArray();
         }
 
         ByteArray end;
-        if (segment.getDateRangeEnd() != Long.MAX_VALUE) {
-            end = encodeTime(segment.getDateRangeEnd(), index, -1);
+        if (!tsRange.end.isMax) {
+            end = encodeTime(tsRange.end.v, index, -1);
         } else {
             end = new ByteArray();
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java 
b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 0e22587..be523ed 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -189,7 +189,7 @@ public class CubeDesc extends RootPersistentEntity 
implements IEngineAware {
     @JsonInclude(JsonInclude.Include.NON_NULL)
     private int parentForward = 3;
 
-    // cuboid scheduler lazy built
+    // lazy init
     transient private CuboidScheduler cuboidScheduler = null;
 
     public boolean isEnableSharding() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
index c49d37a..f8b039a 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.cube.model;
 
+import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 
@@ -28,6 +29,7 @@ import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.TblColRef;
 
 import com.google.common.base.Preconditions;
@@ -36,7 +38,8 @@ import com.google.common.collect.Maps;
 
 /**
  */
-public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, 
java.io.Serializable {
+@SuppressWarnings("serial")
+public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, 
Serializable {
 
     protected final String tableName;
     protected final CubeDesc cubeDesc;
@@ -146,13 +149,8 @@ public class CubeJoinedFlatTableDesc implements 
IJoinedFlatTableDesc, java.io.Se
     }
 
     @Override
-    public long getSourceOffsetStart() {
-        return cubeSegment.getSourceOffsetStart();
-    }
-
-    @Override
-    public long getSourceOffsetEnd() {
-        return cubeSegment.getSourceOffsetEnd();
+    public SegmentRange getSegRange() {
+        return cubeSegment.getSegRange();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
index e829aeb..223df7c 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
@@ -18,20 +18,23 @@
 
 package org.apache.kylin.cube.model;
 
+import java.io.Serializable;
+import java.util.List;
+
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.TblColRef;
 
-import java.util.List;
-
 /**
  * An enrich of IJoinedFlatTableDesc for cubes
  */
-public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc, 
java.io.Serializable {
+@SuppressWarnings("serial")
+public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc, 
Serializable {
 
     private CubeDesc cubeDesc;
     private IJoinedFlatTableDesc flatDesc;
@@ -113,13 +116,8 @@ public class CubeJoinedFlatTableEnrich implements 
IJoinedFlatTableDesc, java.io.
     }
 
     @Override
-    public long getSourceOffsetStart() {
-        return flatDesc.getSourceOffsetStart();
-    }
-
-    @Override
-    public long getSourceOffsetEnd() {
-        return flatDesc.getSourceOffsetEnd();
+    public SegmentRange getSegRange() {
+        return flatDesc.getSegRange();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java 
b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
index 3cae37d..7dd7212 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
@@ -30,8 +30,9 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
@@ -108,10 +109,10 @@ public class CubeManagerTest extends 
LocalFileMetadataTestCase {
         assertEquals(0, cube.getSegments().size());
 
         // append first
-        CubeSegment seg1 = mgr.appendSegment(cube, 0, 1000, 0, 0, null, null);
+        CubeSegment seg1 = mgr.appendSegment(cube, new TSRange(0L, 1000L), 
null, null, null);
         seg1.setStatus(SegmentStatusEnum.READY);
 
-        CubeSegment seg2 = mgr.appendSegment(cube, 1000, 2000, 0, 0, null, 
null);
+        CubeSegment seg2 = mgr.appendSegment(cube, new TSRange(1000L, 2000L), 
null, null, null);
         seg2.setStatus(SegmentStatusEnum.READY);
 
         CubeUpdate cubeBuilder = new CubeUpdate(cube);
@@ -120,7 +121,7 @@ public class CubeManagerTest extends 
LocalFileMetadataTestCase {
 
         assertEquals(2, cube.getSegments().size());
 
-        Pair<Long, Long> mergedSeg = mgr.autoMergeCubeSegments(cube);
+        SegmentRange mergedSeg = cube.autoMergeCubeSegments();
 
         assertTrue(mergedSeg != null);
 
@@ -145,25 +146,25 @@ public class CubeManagerTest extends 
LocalFileMetadataTestCase {
         m4.put(1, 4000L);
 
         // append first
-        CubeSegment seg1 = mgr.appendSegment(cube, 0, 0, 0, 1000, null, m1);
+        CubeSegment seg1 = mgr.appendSegment(cube, null, new SegmentRange(0L, 
1000L), null, m1);
         seg1.setStatus(SegmentStatusEnum.READY);
 
 
-        CubeSegment seg2 = mgr.appendSegment(cube, 0, 0, 1000, 2000, m1, m2);
+        CubeSegment seg2 = mgr.appendSegment(cube, null, new 
SegmentRange(1000L, 2000L), m1, m2);
         seg2.setStatus(SegmentStatusEnum.READY);
 
 
-        CubeSegment seg3 = mgr.mergeSegments(cube, 0, 0, 0000, 2000, true);
+        CubeSegment seg3 = mgr.mergeSegments(cube, null, new SegmentRange(0L, 
2000L), true);
         seg3.setStatus(SegmentStatusEnum.NEW);
 
 
-        CubeSegment seg4 = mgr.appendSegment(cube, 0, 0, 2000, 3000, m2, m3);
+        CubeSegment seg4 = mgr.appendSegment(cube, null, new 
SegmentRange(2000L, 3000L), m2, m3);
         seg4.setStatus(SegmentStatusEnum.NEW);
         seg4.setLastBuildJobID("test");
         seg4.setStorageLocationIdentifier("test");
 
 
-        CubeSegment seg5 = mgr.appendSegment(cube, 0, 0, 3000, 4000, m3, m4);
+        CubeSegment seg5 = mgr.appendSegment(cube, null, new 
SegmentRange(3000L, 4000L), m3, m4);
         seg5.setStatus(SegmentStatusEnum.READY);
 
         CubeUpdate cubeBuilder = new CubeUpdate(cube);
@@ -202,26 +203,26 @@ public class CubeManagerTest extends 
LocalFileMetadataTestCase {
         m4.put(1, 4000L);
 
         // append first
-        CubeSegment seg1 = mgr.appendSegment(cube, 0, 0, 0, 1000, null, m1);
+        CubeSegment seg1 = mgr.appendSegment(cube, null, new SegmentRange(0L, 
1000L), null, m1);
         seg1.setStatus(SegmentStatusEnum.READY);
 
-        CubeSegment seg2 = mgr.appendSegment(cube, 0, 0, 1000, 2000, m1, m2);
+        CubeSegment seg2 = mgr.appendSegment(cube, null, new 
SegmentRange(1000L, 2000L), m1, m2);
         seg2.setStatus(SegmentStatusEnum.READY);
 
-        CubeSegment seg3 = mgr.appendSegment(cube, 0, 0, 2000, 3000, m2, m3);
+        CubeSegment seg3 = mgr.appendSegment(cube, null, new 
SegmentRange(2000L, 3000L), m2, m3);
         seg3.setStatus(SegmentStatusEnum.READY);
 
-        CubeSegment seg4 = mgr.appendSegment(cube, 0, 0, 3000, 4000, m3, m4);
+        CubeSegment seg4 = mgr.appendSegment(cube, null, new 
SegmentRange(3000L, 4000L), m3, m4);
         seg4.setStatus(SegmentStatusEnum.READY);
 
 
 
-        CubeSegment merge1 = mgr.mergeSegments(cube, 0, 0, 0, 2000, true);
+        CubeSegment merge1 = mgr.mergeSegments(cube, null, new 
SegmentRange(0L, 2000L), true);
         merge1.setStatus(SegmentStatusEnum.NEW);
         merge1.setLastBuildJobID("test");
         merge1.setStorageLocationIdentifier("test");
 
-        CubeSegment merge2 = mgr.mergeSegments(cube, 0, 0, 2000, 4000, true);
+        CubeSegment merge2 = mgr.mergeSegments(cube, null, new 
SegmentRange(2000L, 4000L), true);
         merge2.setStatus(SegmentStatusEnum.NEW);
         merge2.setLastBuildJobID("test");
         merge2.setStorageLocationIdentifier("test");
@@ -267,41 +268,41 @@ public class CubeManagerTest extends 
LocalFileMetadataTestCase {
         assertEquals(0, cube.getSegments().size());
 
         // append first
-        CubeSegment seg1 = mgr.appendSegment(cube, 0, 1000);
+        CubeSegment seg1 = mgr.appendSegment(cube, new TSRange(0L, 1000L));
         seg1.setStatus(SegmentStatusEnum.READY);
 
-        CubeSegment seg3 = mgr.appendSegment(cube, 2000, 4000);
+        CubeSegment seg3 = mgr.appendSegment(cube, new TSRange(2000L, 4000L));
         seg3.setStatus(SegmentStatusEnum.READY);
 
         assertEquals(2, cube.getSegments().size());
 
-        Pair<Long, Long> mergedSeg = mgr.autoMergeCubeSegments(cube);
+        SegmentRange mergedSeg = cube.autoMergeCubeSegments();
 
         assertTrue(mergedSeg == null);
 
         // append a new seg which will be merged
 
-        CubeSegment seg4 = mgr.appendSegment(cube, 4000, 8000);
+        CubeSegment seg4 = mgr.appendSegment(cube, new TSRange(4000L, 8000L));
         seg4.setStatus(SegmentStatusEnum.READY);
 
         assertEquals(3, cube.getSegments().size());
 
-        mergedSeg = mgr.autoMergeCubeSegments(cube);
+        mergedSeg = cube.autoMergeCubeSegments();
 
         assertTrue(mergedSeg != null);
-        assertTrue(mergedSeg.getFirst() == 2000 && mergedSeg.getSecond() == 
8000);
+        assertTrue((Long) mergedSeg.start.v == 2000 && (Long) mergedSeg.end.v 
== 8000);
 
         // fill the gap
 
-        CubeSegment seg2 = mgr.appendSegment(cube, 1000, 2000);
+        CubeSegment seg2 = mgr.appendSegment(cube, new TSRange(1000L, 2000L));
         seg2.setStatus(SegmentStatusEnum.READY);
 
         assertEquals(4, cube.getSegments().size());
 
-        mergedSeg = mgr.autoMergeCubeSegments(cube);
+        mergedSeg = cube.autoMergeCubeSegments();
 
         assertTrue(mergedSeg != null);
-        assertTrue(mergedSeg.getFirst() == 0 && mergedSeg.getSecond() == 8000);
+        assertTrue((Long) mergedSeg.start.v == 0 && (Long) mergedSeg.end.v == 
8000);
     }
 
     public CubeDescManager getCubeDescManager() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java 
b/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
index de91dd2..64c6d68 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
@@ -25,6 +25,8 @@ import java.io.IOException;
 
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.junit.After;
 import org.junit.Before;
@@ -52,10 +54,8 @@ public class CubeSegmentsTest extends 
LocalFileMetadataTestCase {
 
         // first append, creates a new & single segment
         CubeSegment seg = mgr.appendSegment(cube);
-        assertEquals(0, seg.getDateRangeStart());
-        assertEquals(Long.MAX_VALUE, seg.getDateRangeEnd());
-        assertEquals(0, seg.getSourceOffsetStart());
-        assertEquals(Long.MAX_VALUE, seg.getSourceOffsetEnd());
+        assertEquals(new TSRange(0L, Long.MAX_VALUE), seg.getTSRange());
+        assertEquals(new TSRange(0L, Long.MAX_VALUE), seg.getSegRange());
         assertEquals(1, cube.getSegments().size());
 
         // second append, throw IllegalStateException because the first 
segment is not built
@@ -82,15 +82,13 @@ public class CubeSegmentsTest extends 
LocalFileMetadataTestCase {
 
         // append again, for non-partitioned cube, it becomes a full refresh
         CubeSegment seg2 = mgr.appendSegment(cube);
-        assertEquals(0, seg2.getDateRangeStart());
-        assertEquals(Long.MAX_VALUE, seg2.getDateRangeEnd());
-        assertEquals(0, seg2.getSourceOffsetStart());
-        assertEquals(Long.MAX_VALUE, seg2.getSourceOffsetEnd());
+        assertEquals(new TSRange(0L, Long.MAX_VALUE), seg2.getTSRange());
+        assertEquals(new TSRange(0L, Long.MAX_VALUE), seg2.getSegRange());
         assertEquals(2, cube.getSegments().size());
 
         // non-partitioned cannot merge, throw exception
         try {
-            mgr.mergeSegments(cube, 0, 0, 0, Long.MAX_VALUE, false);
+            mgr.mergeSegments(cube, null, new SegmentRange(0L, 
Long.MAX_VALUE), false);
             fail();
         } catch (IllegalStateException ex) {
             // good
@@ -106,28 +104,24 @@ public class CubeSegmentsTest extends 
LocalFileMetadataTestCase {
         assertEquals(0, cube.getSegments().size());
 
         // append first
-        CubeSegment seg1 = mgr.appendSegment(cube, 0, 1000);
+        CubeSegment seg1 = mgr.appendSegment(cube, new TSRange(0L, 1000L));
         seg1.setStatus(SegmentStatusEnum.READY);
 
         // append second
-        CubeSegment seg2 = mgr.appendSegment(cube, 1000, 2000);
+        CubeSegment seg2 = mgr.appendSegment(cube, new TSRange(1000L, 2000L));
 
         assertEquals(2, cube.getSegments().size());
-        assertEquals(1000, seg2.getDateRangeStart());
-        assertEquals(2000, seg2.getDateRangeEnd());
-        assertEquals(1000, seg2.getSourceOffsetStart());
-        assertEquals(2000, seg2.getSourceOffsetEnd());
+        assertEquals(new TSRange(1000L, 2000L), seg2.getTSRange());
+        assertEquals(new TSRange(1000L, 2000L), seg2.getSegRange());
         assertEquals(SegmentStatusEnum.NEW, seg2.getStatus());
         seg2.setStatus(SegmentStatusEnum.READY);
 
         // merge first and second
-        CubeSegment merge = mgr.mergeSegments(cube, 0, 2000, 0, 0, true);
+        CubeSegment merge = mgr.mergeSegments(cube, new TSRange(0L, 2000L), 
null, true);
 
         assertEquals(3, cube.getSegments().size());
-        assertEquals(0, merge.getDateRangeStart());
-        assertEquals(2000, merge.getDateRangeEnd());
-        assertEquals(0, merge.getSourceOffsetStart());
-        assertEquals(2000, merge.getSourceOffsetEnd());
+        assertEquals(new TSRange(0L, 2000L), merge.getTSRange());
+        assertEquals(new TSRange(0L, 2000L), merge.getSegRange());
         assertEquals(SegmentStatusEnum.NEW, merge.getStatus());
 
         // segments are strictly ordered
@@ -140,18 +134,16 @@ public class CubeSegmentsTest extends 
LocalFileMetadataTestCase {
 
         // try merge at start/end at middle of segments
         try {
-            mgr.mergeSegments(cube, 500, 2500, 0, 0, true);
+            mgr.mergeSegments(cube, new TSRange(500L, 2500L), null, true);
             fail();
         } catch (IllegalArgumentException ex) {
             // good
         }
 
-        CubeSegment merge2 = mgr.mergeSegments(cube, 0, 2500, 0, 0, true);
+        CubeSegment merge2 = mgr.mergeSegments(cube, new TSRange(0L, 2500L), 
null, true);
         assertEquals(3, cube.getSegments().size());
-        assertEquals(0, merge2.getDateRangeStart());
-        assertEquals(2000, merge2.getDateRangeEnd());
-        assertEquals(0, merge2.getSourceOffsetStart());
-        assertEquals(2000, merge2.getSourceOffsetEnd());
+        assertEquals(new TSRange(0L, 2000L), merge2.getTSRange());
+        assertEquals(new TSRange(0L, 2000L), merge2.getSegRange());
     }
 
     @Test
@@ -164,25 +156,25 @@ public class CubeSegmentsTest extends 
LocalFileMetadataTestCase {
         assertEquals(0, cube.getSegments().size());
 
         // append the first
-        CubeSegment seg1 = mgr.appendSegment(cube, 0, 1000);
+        CubeSegment seg1 = mgr.appendSegment(cube, new TSRange(0L, 1000L));
         seg1.setStatus(SegmentStatusEnum.READY);
         assertEquals(1, cube.getSegments().size());
 
         // append the third
-        CubeSegment seg3 = mgr.appendSegment(cube, 2000, 3000);
+        CubeSegment seg3 = mgr.appendSegment(cube, new TSRange(2000L, 3000L));
         seg3.setStatus(SegmentStatusEnum.READY);
         assertEquals(2, cube.getSegments().size());
 
         // reject overlap
         try {
-            mgr.appendSegment(cube, 1000, 2500);
+            mgr.appendSegment(cube, new TSRange(1000L, 2500L));
             fail();
         } catch (IllegalStateException ex) {
             // good
         }
 
         // append the second
-        CubeSegment seg2 = mgr.appendSegment(cube, 1000, 2000);
+        CubeSegment seg2 = mgr.appendSegment(cube, new TSRange(1000L, 2000L));
         seg2.setStatus(SegmentStatusEnum.READY);
         assertEquals(3, cube.getSegments().size());
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java 
b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index acb29e1..8dd5093 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -33,6 +33,7 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.JoinTableDesc;
 import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.model.TblColRef;
 
@@ -216,12 +217,11 @@ public class JoinedFlatTable {
         if (flatDesc.getSegment() != null) {
             PartitionDesc partDesc = model.getPartitionDesc();
             if (partDesc != null && partDesc.getPartitionDateColumn() != null) 
{
-                long dateStart = flatDesc.getSourceOffsetStart();
-                long dateEnd = flatDesc.getSourceOffsetEnd();
+                SegmentRange segRange = flatDesc.getSegRange();
 
-                if (!(dateStart == 0 && dateEnd == Long.MAX_VALUE)) {
+                if (segRange != null && !segRange.isInfinite()) {
                     whereBuilder.append(hasCondition ? " AND (" : " (");
-                    
whereBuilder.append(partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc,
 dateStart, dateEnd));
+                    
whereBuilder.append(partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc,
 segRange));
                     whereBuilder.append(")" + sep);
                     hasCondition = true;
                 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java
index bd785c8..ea423ae 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ComputedColumnDesc.java
@@ -38,6 +38,7 @@ import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 
+@SuppressWarnings("serial")
 @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, 
getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = 
JsonAutoDetect.Visibility.NONE, setterVisibility = 
JsonAutoDetect.Visibility.NONE)
 public class ComputedColumnDesc implements Serializable {
     private static final Logger logger = 
LoggerFactory.getLogger(ComputedColumnDesc.class);

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
index b545e50..0589829 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
@@ -32,14 +32,13 @@ public interface IJoinedFlatTableDesc {
 
     int getColumnIndex(TblColRef colRef);
 
-    long getSourceOffsetStart();
-
-    long getSourceOffsetEnd();
+    SegmentRange getSegRange();
 
     TblColRef getDistributedBy();
 
     TblColRef getClusterBy();
 
+    // optionally present
     ISegment getSegment();
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java
index d46ea96..b8bee36 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java
@@ -18,19 +18,20 @@
 
 package org.apache.kylin.metadata.model;
 
-public interface ISegment {
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
 
-    public String getName();
-
-    public long getDateRangeStart();
-
-    public long getDateRangeEnd();
-
-    public boolean isSourceOffsetsOn();
+public interface ISegment extends Comparable<ISegment> {
 
-    public long getSourceOffsetStart();
+    public KylinConfig getConfig();
+    
+    public String getName();
 
-    public long getSourceOffsetEnd();
+    public boolean isOffsetCube();
+    
+    public SegmentRange getSegRange();
+    
+    public TSRange getTSRange();
 
     public DataModelDesc getModel();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegmentAdvisor.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegmentAdvisor.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegmentAdvisor.java
new file mode 100644
index 0000000..00337f4
--- /dev/null
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegmentAdvisor.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.model;
+
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
+
+public interface ISegmentAdvisor {
+    
+    boolean isOffsetCube();
+    
+    SegmentRange getSegRange();
+    
+    void setSegRange(SegmentRange range);
+    
+    TSRange getTSRange();
+    
+    void setTSRange(TSRange range);
+}
+

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java
index eb82ace..c8a5110 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java
@@ -18,16 +18,17 @@
 
 package org.apache.kylin.metadata.model;
 
+import java.io.Serializable;
+import java.util.Arrays;
+
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 
-import java.io.Serializable;
-import java.util.Arrays;
-
 /**
  */
+@SuppressWarnings("serial")
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = 
Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = 
Visibility.NONE)
 public class JoinDesc implements Serializable {
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
index deec4f2..ae4e4e6 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
@@ -29,6 +29,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 /**
  */
 
+@SuppressWarnings("serial")
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = 
Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = 
Visibility.NONE)
 public class MeasureDesc implements Serializable {
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/75fbdcff/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
index 3c00149..cfa062b 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.metadata.model;
 
+import java.io.Serializable;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.DateFormat;
@@ -27,14 +29,13 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-import java.io.Serializable;
-
 /**
  */
+@SuppressWarnings("serial")
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = 
Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = 
Visibility.NONE)
 public class PartitionDesc implements Serializable {
 
-    public static enum PartitionType implements Serializable{
+    public static enum PartitionType implements Serializable {
         APPEND, //
         UPDATE_INSERT // not used since 0.7.1
     }
@@ -80,7 +81,7 @@ public class PartitionDesc implements Serializable {
     public boolean partitionColumnIsYmdInt() {
         if (partitionDateColumnRef == null)
             return false;
-        
+
         DataType type = partitionDateColumnRef.getType();
         return (type.isInt() || type.isBigInt()) && 
DateFormat.isDatePattern(partitionDateFormat);
     }
@@ -88,7 +89,7 @@ public class PartitionDesc implements Serializable {
     public boolean partitionColumnIsTimeMillis() {
         if (partitionDateColumnRef == null)
             return false;
-        
+
         DataType type = partitionDateColumnRef.getType();
         return type.isBigInt() && 
!DateFormat.isDatePattern(partitionDateFormat);
     }
@@ -105,12 +106,12 @@ public class PartitionDesc implements Serializable {
     public void setPartitionDateColumn(String partitionDateColumn) {
         this.partitionDateColumn = partitionDateColumn;
     }
-    
+
     // for test
     void setPartitionDateColumnRef(TblColRef partitionDateColumnRef) {
         this.partitionDateColumnRef = partitionDateColumnRef;
     }
-    
+
     public String getPartitionTimeColumn() {
         return partitionTimeColumn;
     }
@@ -124,7 +125,7 @@ public class PartitionDesc implements Serializable {
     void setPartitionTimeColumnRef(TblColRef partitionTimeColumnRef) {
         this.partitionTimeColumnRef = partitionTimeColumnRef;
     }
-    
+
     @Deprecated
     public long getPartitionDateStart() {
         return partitionDateStart;
@@ -170,37 +171,44 @@ public class PartitionDesc implements Serializable {
     public TblColRef getPartitionTimeColumnRef() {
         return partitionTimeColumnRef;
     }
-    
+
     // 
============================================================================
 
     public static interface IPartitionConditionBuilder {
-        String buildDateRangeCondition(PartitionDesc partDesc, long 
startInclusive, long endExclusive);
+        String buildDateRangeCondition(PartitionDesc partDesc, SegmentRange 
segRange);
     }
 
-    public static class DefaultPartitionConditionBuilder implements 
IPartitionConditionBuilder, java.io.Serializable {
+    public static class DefaultPartitionConditionBuilder implements 
IPartitionConditionBuilder, Serializable {
 
         @Override
-        public String buildDateRangeCondition(PartitionDesc partDesc, long 
startInclusive, long endExclusive) {
-            StringBuilder builder = new StringBuilder();
+        public String buildDateRangeCondition(PartitionDesc partDesc, 
SegmentRange segRange) {
+            long startInclusive = (Long) segRange.start.v;
+            long endExclusive = (Long) segRange.end.v;
+            
             TblColRef partitionDateColumn = 
partDesc.getPartitionDateColumnRef();
             TblColRef partitionTimeColumn = 
partDesc.getPartitionTimeColumnRef();
+            StringBuilder builder = new StringBuilder();
 
             if (partDesc.partitionColumnIsYmdInt()) {
                 buildSingleColumnRangeCondAsYmdInt(builder, 
partitionDateColumn, startInclusive, endExclusive);
             } else if (partDesc.partitionColumnIsTimeMillis()) {
                 buildSingleColumnRangeCondAsTimeMillis(builder, 
partitionDateColumn, startInclusive, endExclusive);
             } else if (partitionDateColumn != null && partitionTimeColumn == 
null) {
-                buildSingleColumnRangeCondition(builder, partitionDateColumn, 
startInclusive, endExclusive, partDesc.getPartitionDateFormat());
+                buildSingleColumnRangeCondition(builder, partitionDateColumn, 
startInclusive, endExclusive,
+                        partDesc.getPartitionDateFormat());
             } else if (partitionDateColumn == null && partitionTimeColumn != 
null) {
-                buildSingleColumnRangeCondition(builder, partitionTimeColumn, 
startInclusive, endExclusive, partDesc.getPartitionTimeFormat());
+                buildSingleColumnRangeCondition(builder, partitionTimeColumn, 
startInclusive, endExclusive,
+                        partDesc.getPartitionTimeFormat());
             } else if (partitionDateColumn != null && partitionTimeColumn != 
null) {
-                buildMultipleColumnRangeCondition(builder, 
partitionDateColumn, partitionTimeColumn, startInclusive, endExclusive, 
partDesc.getPartitionDateFormat(), partDesc.getPartitionTimeFormat());
+                buildMultipleColumnRangeCondition(builder, 
partitionDateColumn, partitionTimeColumn, startInclusive,
+                        endExclusive, partDesc.getPartitionDateFormat(), 
partDesc.getPartitionTimeFormat());
             }
 
             return builder.toString();
         }
 
-        private static void 
buildSingleColumnRangeCondAsTimeMillis(StringBuilder builder, TblColRef 
partitionColumn, long startInclusive, long endExclusive) {
+        private static void 
buildSingleColumnRangeCondAsTimeMillis(StringBuilder builder, TblColRef 
partitionColumn,
+                long startInclusive, long endExclusive) {
             String partitionColumnName = partitionColumn.getIdentity();
             if (startInclusive > 0) {
                 builder.append(partitionColumnName + " >= " + startInclusive);
@@ -209,35 +217,47 @@ public class PartitionDesc implements Serializable {
             builder.append(partitionColumnName + " < " + endExclusive);
         }
 
-        private static void buildSingleColumnRangeCondAsYmdInt(StringBuilder 
builder, TblColRef partitionColumn, long startInclusive, long endExclusive) {
+        private static void buildSingleColumnRangeCondAsYmdInt(StringBuilder 
builder, TblColRef partitionColumn,
+                long startInclusive, long endExclusive) {
             String partitionColumnName = partitionColumn.getIdentity();
             if (startInclusive > 0) {
-                builder.append(partitionColumnName + " >= " + 
DateFormat.formatToDateStr(startInclusive, DateFormat.COMPACT_DATE_PATTERN));
+                builder.append(partitionColumnName + " >= "
+                        + DateFormat.formatToDateStr(startInclusive, 
DateFormat.COMPACT_DATE_PATTERN));
                 builder.append(" AND ");
             }
-            builder.append(partitionColumnName + " < " + 
DateFormat.formatToDateStr(endExclusive, DateFormat.COMPACT_DATE_PATTERN));
+            builder.append(partitionColumnName + " < "
+                    + DateFormat.formatToDateStr(endExclusive, 
DateFormat.COMPACT_DATE_PATTERN));
         }
 
-        private static void buildSingleColumnRangeCondition(StringBuilder 
builder, TblColRef partitionColumn, long startInclusive, long endExclusive, 
String partitionColumnDateFormat) {
+        private static void buildSingleColumnRangeCondition(StringBuilder 
builder, TblColRef partitionColumn,
+                long startInclusive, long endExclusive, String 
partitionColumnDateFormat) {
             String partitionColumnName = partitionColumn.getIdentity();
             if (startInclusive > 0) {
-                builder.append(partitionColumnName + " >= '" + 
DateFormat.formatToDateStr(startInclusive, partitionColumnDateFormat) + "'");
+                builder.append(partitionColumnName + " >= '"
+                        + DateFormat.formatToDateStr(startInclusive, 
partitionColumnDateFormat) + "'");
                 builder.append(" AND ");
             }
-            builder.append(partitionColumnName + " < '" + 
DateFormat.formatToDateStr(endExclusive, partitionColumnDateFormat) + "'");
+            builder.append(partitionColumnName + " < '"
+                    + DateFormat.formatToDateStr(endExclusive, 
partitionColumnDateFormat) + "'");
         }
 
-        private static void buildMultipleColumnRangeCondition(StringBuilder 
builder, TblColRef partitionDateColumn, TblColRef partitionTimeColumn, long 
startInclusive, long endExclusive, String partitionColumnDateFormat, String 
partitionColumnTimeFormat) {
+        private static void buildMultipleColumnRangeCondition(StringBuilder 
builder, TblColRef partitionDateColumn,
+                TblColRef partitionTimeColumn, long startInclusive, long 
endExclusive, String partitionColumnDateFormat,
+                String partitionColumnTimeFormat) {
             String partitionDateColumnName = partitionDateColumn.getIdentity();
             String partitionTimeColumnName = partitionTimeColumn.getIdentity();
             if (startInclusive > 0) {
                 builder.append("(");
                 builder.append("(");
-                builder.append(partitionDateColumnName + " = '" + 
DateFormat.formatToDateStr(startInclusive, partitionColumnDateFormat) + 
"'").append(" AND ").append(partitionTimeColumnName + " >= '" + 
DateFormat.formatToDateStr(startInclusive, partitionColumnTimeFormat) + "'");
+                builder.append(partitionDateColumnName + " = '"
+                        + DateFormat.formatToDateStr(startInclusive, 
partitionColumnDateFormat) + "'").append(" AND ")
+                        .append(partitionTimeColumnName + " >= '"
+                                + DateFormat.formatToDateStr(startInclusive, 
partitionColumnTimeFormat) + "'");
                 builder.append(")");
                 builder.append(" OR ");
                 builder.append("(");
-                builder.append(partitionDateColumnName + " > '" + 
DateFormat.formatToDateStr(startInclusive, partitionColumnDateFormat) + "'");
+                builder.append(partitionDateColumnName + " > '"
+                        + DateFormat.formatToDateStr(startInclusive, 
partitionColumnDateFormat) + "'");
                 builder.append(")");
                 builder.append(")");
                 builder.append(" AND ");
@@ -245,11 +265,15 @@ public class PartitionDesc implements Serializable {
 
             builder.append("(");
             builder.append("(");
-            builder.append(partitionDateColumnName + " = '" + 
DateFormat.formatToDateStr(endExclusive, partitionColumnDateFormat) + 
"'").append(" AND ").append(partitionTimeColumnName + " < '" + 
DateFormat.formatToDateStr(endExclusive, partitionColumnTimeFormat) + "'");
+            builder.append(partitionDateColumnName + " = '"
+                    + DateFormat.formatToDateStr(endExclusive, 
partitionColumnDateFormat) + "'").append(" AND ")
+                    .append(partitionTimeColumnName + " < '"
+                            + DateFormat.formatToDateStr(endExclusive, 
partitionColumnTimeFormat) + "'");
             builder.append(")");
             builder.append(" OR ");
             builder.append("(");
-            builder.append(partitionDateColumnName + " < '" + 
DateFormat.formatToDateStr(endExclusive, partitionColumnDateFormat) + "'");
+            builder.append(partitionDateColumnName + " < '"
+                    + DateFormat.formatToDateStr(endExclusive, 
partitionColumnDateFormat) + "'");
             builder.append(")");
             builder.append(")");
         }
@@ -259,15 +283,18 @@ public class PartitionDesc implements Serializable {
      * Another implementation of IPartitionConditionBuilder, for the fact 
tables which have three partition columns "YEAR", "MONTH", and "DAY"; This
      * class will concat the three columns into yyyy-MM-dd format for query 
hive;
      */
-    public static class YearMonthDayPartitionConditionBuilder implements 
PartitionDesc.IPartitionConditionBuilder {
+    public static class YearMonthDayPartitionConditionBuilder implements 
IPartitionConditionBuilder {
 
         @Override
-        public String buildDateRangeCondition(PartitionDesc partDesc, long 
startInclusive, long endExclusive) {
+        public String buildDateRangeCondition(PartitionDesc partDesc, 
SegmentRange segRange) {
+            long startInclusive = (Long) segRange.start.v;
+            long endExclusive = (Long) segRange.end.v;
 
             TblColRef partitionColumn = partDesc.getPartitionDateColumnRef();
             String tableAlias = partitionColumn.getTableAlias();
 
-            String concatField = 
String.format("CONCAT(%s.YEAR,'-',%s.MONTH,'-',%s.DAY)", tableAlias, 
tableAlias, tableAlias);
+            String concatField = 
String.format("CONCAT(%s.YEAR,'-',%s.MONTH,'-',%s.DAY)", tableAlias, tableAlias,
+                    tableAlias);
             StringBuilder builder = new StringBuilder();
 
             if (startInclusive > 0) {

Reply via email to