Repository: kylin Updated Branches: refs/heads/master 4a0ee7989 -> 0eebd7d8a
KYLIN-2245 refine CubeSegments Signed-off-by: Li Yang <liy...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3ca53954 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3ca53954 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3ca53954 Branch: refs/heads/master Commit: 3ca53954415c62c2a91473c6d7bf831504b5e9db Parents: 4a0ee79 Author: Cheng Wang <cheng.w...@kyligence.io> Authored: Fri Dec 2 15:35:52 2016 +0800 Committer: Li Yang <liy...@apache.org> Committed: Fri Dec 2 17:26:07 2016 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/cube/CubeInstance.java | 115 +++------------ .../java/org/apache/kylin/cube/CubeManager.java | 10 +- .../java/org/apache/kylin/cube/CubeSegment.java | 8 +- .../apache/kylin/metadata/model/ISegment.java | 6 +- .../apache/kylin/metadata/model/Segments.java | 140 +++++++++++++++++++ .../hbase/util/ExtendCubeToHybridCLI.java | 4 +- .../kylin/tool/ExtendCubeToHybridCLI.java | 4 +- 7 files changed, 176 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/3ca53954/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java index 061ab23..8b12c2e 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java @@ -18,8 +18,6 @@ package org.apache.kylin.cube; -import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; import java.util.Set; @@ -34,6 +32,7 @@ import org.apache.kylin.metadata.model.IBuildable; import org.apache.kylin.metadata.model.JoinTableDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.model.Segments; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.realization.CapabilityResult; import org.apache.kylin.metadata.realization.CapabilityResult.CapabilityInfluence; @@ -64,7 +63,7 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, cubeInstance.setName(cubeName); cubeInstance.setDescName(cubeDesc.getName()); cubeInstance.setCreateTimeUTC(System.currentTimeMillis()); - cubeInstance.setSegments(new ArrayList<CubeSegment>()); + cubeInstance.setSegments(new Segments<CubeSegment>()); cubeInstance.setStatus(RealizationStatusEnum.DISABLED); cubeInstance.updateRandomUuid(); @@ -87,7 +86,7 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, @JsonManagedReference @JsonProperty("segments") - private List<CubeSegment> segments = new ArrayList<CubeSegment>(); + private Segments<CubeSegment> segments = new Segments<CubeSegment>(); @JsonProperty("create_time_utc") private long createTimeUTC; @@ -97,39 +96,11 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, } public List<CubeSegment> getBuildingSegments() { - List<CubeSegment> buildingSegments = new ArrayList<CubeSegment>(); - if (null != segments) { - for (CubeSegment segment : segments) { - if (SegmentStatusEnum.NEW == segment.getStatus() || SegmentStatusEnum.READY_PENDING == segment.getStatus()) { - buildingSegments.add(segment); - } - } - } - - return buildingSegments; + return segments.getBuildingSegments(); } public List<CubeSegment> getMergingSegments(CubeSegment mergedSegment) { - LinkedList<CubeSegment> result = new LinkedList<CubeSegment>(); - if (mergedSegment == null) - return result; - - for (CubeSegment seg : this.segments) { - if (seg.getStatus() != SegmentStatusEnum.READY && seg.getStatus() != SegmentStatusEnum.READY_PENDING) - continue; - - if (seg == mergedSegment) - continue; - - if (mergedSegment.sourceOffsetContains(seg)) { - // make sure no holes - if (result.size() > 0 && result.getLast().getSourceOffsetEnd() != seg.getSourceOffsetStart()) - throw new IllegalStateException("Merging segments must not have holes between " + result.getLast() + " and " + seg); - - result.add(seg); - } - } - return result; + return segments.getMergingSegments(mergedSegment); } public CubeDesc getDescriptor() { @@ -154,7 +125,7 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, // in a temporary broken state, so that user can edit and fix it. Broken state is often due to // schema changes at source. public boolean allowBrokenDescriptor() { - return (getStatus() == RealizationStatusEnum.DISABLED || getStatus() == RealizationStatusEnum.DESCBROKEN) && segments.isEmpty(); + return (getStatus() == RealizationStatusEnum.DISABLED || getStatus() == RealizationStatusEnum.DESCBROKEN) && segments.isEmpty(); } public String getResourcePath() { @@ -269,65 +240,30 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, } public CubeSegment getFirstSegment() { - if (this.segments == null || this.segments.size() == 0) { - return null; - } else { - return this.segments.get(0); - } + return segments.getFirstSegment(); } public CubeSegment getLatestReadySegment() { - CubeSegment latest = null; - for (int i = segments.size() - 1; i >= 0; i--) { - CubeSegment seg = segments.get(i); - if (seg.getStatus() != SegmentStatusEnum.READY) - continue; - if (latest == null || latest.getDateRangeEnd() < seg.getDateRangeEnd()) { - latest = seg; - } - } - return latest; + return segments.getLatestReadySegment(); } public CubeSegment getLatestBuiltSegment() { - CubeSegment latest = null; - for (int i = segments.size() - 1; i >= 0; i--) { - CubeSegment seg = segments.get(i); - if (seg.getLastBuildTime() > 0) { - if (latest == null || seg.getLastBuildTime() > latest.getLastBuildTime()) - latest = seg; - } - } - return latest; + return segments.getLatestBuiltSegment(); } - - public List<CubeSegment> getSegments() { + + public Segments<CubeSegment> getSegments() { return segments; } - public List<CubeSegment> getSegments(SegmentStatusEnum status) { - List<CubeSegment> result = new ArrayList<CubeSegment>(); - - for (CubeSegment segment : segments) { - if (segment.getStatus() == status) { - result.add(segment); - } - } - - return result; + public Segments<CubeSegment> getSegments(SegmentStatusEnum status) { + return segments.getSegments(status); } public CubeSegment getSegment(String name, SegmentStatusEnum status) { - for (CubeSegment segment : segments) { - if ((null != segment.getName() && segment.getName().equals(name)) && (status == null || segment.getStatus() == status)) { - return segment; - } - } - - return null; + return segments.getSegment(name, status); } - public void setSegments(List<CubeSegment> segments) { + public void setSegments(Segments segments) { this.segments = segments; } @@ -393,30 +329,15 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, public Set<ColumnDesc> getAllColumnDescs() { return getDescriptor().listAllColumnDescs(); } - + @Override public long getDateRangeStart() { - List<CubeSegment> readySegs = getSegments(SegmentStatusEnum.READY); - - long startTime = Long.MAX_VALUE; - for (CubeSegment seg : readySegs) { - startTime = Math.min(startTime, seg.getDateRangeStart()); - } - - return startTime; + return segments.getDateRangeStart(); } @Override public long getDateRangeEnd() { - - List<CubeSegment> readySegs = getSegments(SegmentStatusEnum.READY); - - long endTime = Long.MIN_VALUE; - for (CubeSegment seg : readySegs) { - endTime = Math.max(endTime, seg.getDateRangeEnd()); - } - - return endTime; + return segments.getDateRangeEnd(); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/3ca53954/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 119a21a..4ba29af 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -56,6 +56,7 @@ import org.apache.kylin.metadata.cachesync.Broadcaster; import org.apache.kylin.metadata.cachesync.Broadcaster.Event; import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache; import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.model.Segments; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.project.ProjectInstance; @@ -213,7 +214,6 @@ public class CubeManager implements IRealizationProvider { return result; } - public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, ReadableTable inpTable) throws IOException { CubeDesc cubeDesc = cubeSeg.getCubeDesc(); if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) @@ -222,18 +222,17 @@ public class CubeManager implements IRealizationProvider { String builderClass = cubeDesc.getDictionaryBuilderClass(col); DictionaryInfo dictInfo = getDictionaryManager().buildDictionary(cubeDesc.getModel(), col, inpTable, builderClass); - saveDictionaryInfo(cubeSeg, col, dictInfo); return dictInfo; } - + public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, ReadableTable inpTable, Dictionary<String> dict) throws IOException { CubeDesc cubeDesc = cubeSeg.getCubeDesc(); if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) return null; DictionaryInfo dictInfo = getDictionaryManager().saveDictionary(cubeDesc.getModel(), col, inpTable, dict); - + saveDictionaryInfo(cubeSeg, col, dictInfo); return dictInfo; } @@ -366,7 +365,7 @@ public class CubeManager implements IRealizationProvider { CubeInstance cube = update.getCubeInstance(); logger.info("Updating cube instance '" + cube.getName() + "'"); - List<CubeSegment> newSegs = Lists.newArrayList(cube.getSegments()); + Segments<CubeSegment> newSegs = (Segments) cube.getSegments().clone(); if (update.getToAddSegs() != null) newSegs.addAll(Arrays.asList(update.getToAddSegs())); @@ -385,7 +384,6 @@ public class CubeManager implements IRealizationProvider { } } } - } if (update.getToUpdateSegs() != null) { http://git-wip-us.apache.org/repos/asf/kylin/blob/3ca53954/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java index 1ec01a2..1fc28eb 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java @@ -217,6 +217,7 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable, ISegmen this.inputRecordsSize = inputRecordsSize; } + @Override public long getLastBuildTime() { return lastBuildTime; } @@ -366,11 +367,12 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable, ISegmen } // date range is used in place of source offsets when offsets are missing - public boolean sourceOffsetContains(CubeSegment seg) { + @Override + public boolean sourceOffsetContains(ISegment seg) { if (isSourceOffsetsOn()) - return sourceOffsetStart <= seg.sourceOffsetStart && seg.sourceOffsetEnd <= sourceOffsetEnd; + return sourceOffsetStart <= ((CubeSegment) seg).sourceOffsetStart && ((CubeSegment) seg).sourceOffsetEnd <= sourceOffsetEnd; else - return dateRangeContains(seg); + return dateRangeContains(((CubeSegment) seg)); } public void validate() { http://git-wip-us.apache.org/repos/asf/kylin/blob/3ca53954/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java index f69ae3f..e97f4f4 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java @@ -18,7 +18,7 @@ package org.apache.kylin.metadata.model; -public interface ISegment { +public interface ISegment{ public String getName(); @@ -33,4 +33,8 @@ public interface ISegment { public DataModelDesc getModel(); public SegmentStatusEnum getStatus(); + + public long getLastBuildTime(); + + public boolean sourceOffsetContains(ISegment seg); } http://git-wip-us.apache.org/repos/asf/kylin/blob/3ca53954/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java new file mode 100644 index 0000000..104c2af --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metadata.model; + +import java.util.ArrayList; + +public class Segments<T extends ISegment> extends ArrayList<T> { + + public T getFirstSegment() { + if (this == null || this.size() == 0) { + return null; + } else { + return this.get(0); + } + } + + public long getDateRangeStart() { + Segments<T> readySegs = getSegments(SegmentStatusEnum.READY); + + long startTime = Long.MAX_VALUE; + for (T seg : readySegs) { + startTime = Math.min(startTime, seg.getDateRangeStart()); + } + + return startTime; + } + + public long getDateRangeEnd() { + Segments<T> readySegs = getSegments(SegmentStatusEnum.READY); + + long endTime = Long.MIN_VALUE; + for (T seg : readySegs) { + endTime = Math.max(endTime, seg.getDateRangeEnd()); + } + + return endTime; + } + + public T getLatestReadySegment() { + T latest = null; + for (int i = this.size() - 1; i >= 0; i--) { + T seg = this.get(i); + if (seg.getStatus() != SegmentStatusEnum.READY) + continue; + if (latest == null || latest.getDateRangeEnd() < seg.getDateRangeEnd()) { + latest = seg; + } + } + return latest; + } + + public T getLatestBuiltSegment() { + T latest = null; + for (int i = this.size() - 1; i >= 0; i--) { + T seg = this.get(i); + if (seg.getLastBuildTime() > 0) { + if (latest == null || seg.getLastBuildTime() > latest.getLastBuildTime()) + latest = seg; + } + } + return latest; + } + + public Segments getSegments(SegmentStatusEnum status) { + Segments<T> result = new Segments<>(); + + for (T segment : this) { + if (segment.getStatus() == status) { + result.add(segment); + } + } + return result; + } + + public T getSegment(String name, SegmentStatusEnum status) { + for (T segment : this) { + if ((null != segment.getName() && segment.getName().equals(name)) && (status == null || segment.getStatus() == status)) { + return segment; + } + } + return null; + } + + public Segments getBuildingSegments() { + Segments<T> buildingSegments = new Segments(); + if (null != this) { + for (T segment : this) { + if (SegmentStatusEnum.NEW == segment.getStatus() || SegmentStatusEnum.READY_PENDING == segment.getStatus()) { + buildingSegments.add(segment); + } + } + } + return buildingSegments; + } + + public Segments getMergingSegments(T mergedSegment) { + Segments<T> result = new Segments(); + if (mergedSegment == null) + return result; + + for (T seg : this) { + if (seg.getStatus() != SegmentStatusEnum.READY && seg.getStatus() != SegmentStatusEnum.READY_PENDING) + continue; + + if (seg == mergedSegment) + continue; + + if (mergedSegment.sourceOffsetContains(seg)) { + // make sure no holes + if (result.size() > 0 && result.getLast().getSourceOffsetEnd() != seg.getSourceOffsetStart()) + throw new IllegalStateException("Merging segments must not have holes between " + result.getLast() + " and " + seg); + + result.add(seg); + } + } + return result; + } + + private T getLast() { + assert this.size() != 0; + return this.get(this.size() - 1); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/3ca53954/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java index 8d0cb82..61c73d5 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java @@ -18,7 +18,6 @@ package org.apache.kylin.storage.hbase.util; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -45,6 +44,7 @@ import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.IEngineAware; import org.apache.kylin.metadata.model.IStorageAware; +import org.apache.kylin.metadata.model.Segments; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.project.ProjectManager; import org.apache.kylin.metadata.project.RealizationEntry; @@ -189,7 +189,7 @@ public class ExtendCubeToHybridCLI { logger.info("CubeDesc was saved at: " + cubeDesc.getResourcePath()); // clear segments for old cube - cubeInstance.setSegments(new ArrayList<CubeSegment>()); + cubeInstance.setSegments(new Segments<CubeSegment>()); cubeInstance.setStatus(RealizationStatusEnum.DISABLED); store.putResource(cubeInstance.getResourcePath(), cubeInstance, CubeManager.CUBE_SERIALIZER); logger.info("CubeInstance was saved at: " + cubeInstance.getResourcePath()); http://git-wip-us.apache.org/repos/asf/kylin/blob/3ca53954/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java b/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java index 8bd5766..19e5db0 100644 --- a/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java +++ b/tool/src/main/java/org/apache/kylin/tool/ExtendCubeToHybridCLI.java @@ -18,7 +18,6 @@ package org.apache.kylin.tool; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -45,6 +44,7 @@ import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.IEngineAware; import org.apache.kylin.metadata.model.IStorageAware; +import org.apache.kylin.metadata.model.Segments; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.project.ProjectManager; import org.apache.kylin.metadata.project.RealizationEntry; @@ -185,7 +185,7 @@ public class ExtendCubeToHybridCLI { logger.info("CubeDesc was saved at: " + cubeDesc.getResourcePath()); // clear segments for old cube - cubeInstance.setSegments(new ArrayList<CubeSegment>()); + cubeInstance.setSegments(new Segments()); cubeInstance.setStatus(RealizationStatusEnum.DISABLED); store.putResource(cubeInstance.getResourcePath(), cubeInstance, CubeManager.CUBE_SERIALIZER); logger.info("CubeInstance was saved at: " + cubeInstance.getResourcePath());