KYLIN-2059 Concurrent build issue in CubeManager.calculateToBeSegments() Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/334c2e09 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/334c2e09 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/334c2e09
Branch: refs/heads/orderedbytes Commit: 334c2e09ba443b8fdcb7d4bfe08ab8fbc0ac3fbe Parents: aa51ce0 Author: shaofengshi <shaofeng...@apache.org> Authored: Thu Sep 29 22:56:00 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Thu Oct 6 14:44:05 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 4 + .../java/org/apache/kylin/cube/CubeManager.java | 77 ++++++++------- .../org/apache/kylin/cube/CubeManagerTest.java | 99 ++++++++++++++++++++ 3 files changed, 145 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/334c2e09/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 838ef97..4d1639b 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -809,4 +809,8 @@ abstract public class KylinConfigBase implements Serializable { public int getMaxBuildingSegments() { return Integer.parseInt(getOptional("kylin.cube.building.segment.max", "2")); } + + public void setMaxBuildingSegments(int maxBuildingSegments) { + setProperty("kylin.cube.building.segment.max", String.valueOf(maxBuildingSegments)); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/334c2e09/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 962568c..d243f4d 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -135,7 +135,7 @@ public class CubeManager implements IRealizationProvider { logger.info("Initializing CubeManager with config " + config); this.config = config; this.cubeMap = new CaseInsensitiveStringCache<CubeInstance>(config, "cube"); - + // touch lower level metadata before registering my listener loadAllCubeInstance(); Broadcaster.getInstance(config).registerListener(new CubeSyncListener(), "cube"); @@ -159,12 +159,12 @@ public class CubeManager implements IRealizationProvider { @Override public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException { String cubeName = cacheKey; - + if (event == Event.DROP) removeCubeLocal(cubeName); else reloadCubeLocal(cubeName); - + for (ProjectInstance prj : ProjectManager.getInstance(config).findProjects(RealizationType.CUBE, cubeName)) { broadcaster.notifyProjectDataUpdate(prj.getName()); } @@ -615,7 +615,6 @@ public class CubeManager implements IRealizationProvider { return max; } - private long calculateStartOffsetForAppendSegment(CubeInstance cube) { List<CubeSegment> existing = cube.getSegments(); if (existing.isEmpty()) { @@ -625,7 +624,6 @@ public class CubeManager implements IRealizationProvider { } } - private long calculateStartDateForAppendSegment(CubeInstance cube) { List<CubeSegment> existing = cube.getSegments(); if (existing.isEmpty()) { @@ -728,7 +726,7 @@ public class CubeManager implements IRealizationProvider { List<CubeSegment> mergingSegs = Lists.newArrayList(); if (buildingSegs.size() > 0) { - + for (CubeSegment building : buildingSegs) { // exclude those under-merging segs for (CubeSegment ready : readySegs) { @@ -760,27 +758,22 @@ public class CubeManager implements IRealizationProvider { return null; } - public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment... newSegments) throws IOException { - List<CubeSegment> tobe = calculateToBeSegments(cube); + public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment newSegment) throws IOException { + if (StringUtils.isBlank(newSegment.getStorageLocationIdentifier())) + throw new IllegalStateException("For cube " + cube + ", segment " + newSegment + " missing StorageLocationIdentifier"); - for (CubeSegment seg : newSegments) { - if (tobe.contains(seg) == false) - throw new IllegalStateException("For cube " + cube + ", segment " + seg + " is expected but not in the tobe " + tobe); + if (StringUtils.isBlank(newSegment.getLastBuildJobID())) + throw new IllegalStateException("For cube " + cube + ", segment " + newSegment + " missing LastBuildJobID"); - if (StringUtils.isBlank(seg.getStorageLocationIdentifier())) - throw new IllegalStateException("For cube " + cube + ", segment " + seg + " missing StorageLocationIdentifier"); + if (isReady(newSegment) == true) + throw new IllegalStateException("For cube " + cube + ", segment " + newSegment + " state should be NEW but is READY"); - if (StringUtils.isBlank(seg.getLastBuildJobID())) - throw new IllegalStateException("For cube " + cube + ", segment " + seg + " missing LastBuildJobID"); + List<CubeSegment> tobe = calculateToBeSegments(cube, newSegment); - seg.setStatus(SegmentStatusEnum.READY); - } + if (tobe.contains(newSegment) == false) + throw new IllegalStateException("For cube " + cube + ", segment " + newSegment + " is expected but not in the tobe " + tobe); - for (CubeSegment seg : tobe) { - if (isReady(seg) == false) { - logger.warn("For cube " + cube + ", segment " + seg + " isn't READY yet."); - } - } + newSegment.setStatus(SegmentStatusEnum.READY); List<CubeSegment> toRemoveSegs = Lists.newArrayList(); for (CubeSegment segment : cube.getSegments()) { @@ -788,14 +781,14 @@ public class CubeManager implements IRealizationProvider { toRemoveSegs.add(segment); } - logger.info("Promoting cube " + cube + ", new segments " + Arrays.toString(newSegments) + ", to remove segments " + toRemoveSegs); + logger.info("Promoting cube " + cube + ", new segment " + newSegment + ", to remove segments " + toRemoveSegs); CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()])).setToUpdateSegs(newSegments).setStatus(RealizationStatusEnum.READY); + cubeBuilder.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()])).setToUpdateSegs(newSegment).setStatus(RealizationStatusEnum.READY); updateCube(cubeBuilder); } - public void validateNewSegments(CubeInstance cube, CubeSegment... newSegments) { + public void validateNewSegments(CubeInstance cube, CubeSegment newSegments) { List<CubeSegment> tobe = calculateToBeSegments(cube, newSegments); List<CubeSegment> newList = Arrays.asList(newSegments); if (tobe.containsAll(newList) == false) { @@ -809,11 +802,12 @@ public class CubeManager implements IRealizationProvider { * - Favors new segments over the old * - Favors big segments over the small */ - private List<CubeSegment> calculateToBeSegments(CubeInstance cube, CubeSegment... newSegments) { + private List<CubeSegment> calculateToBeSegments(CubeInstance cube, CubeSegment newSegments) { List<CubeSegment> tobe = Lists.newArrayList(cube.getSegments()); - if (newSegments != null) - tobe.addAll(Arrays.asList(newSegments)); + if (newSegments != null && !tobe.contains(newSegments)) { + tobe.add(newSegments); + } if (tobe.size() == 0) return tobe; @@ -849,13 +843,17 @@ public class CubeManager implements IRealizationProvider { } else { tobe.remove(j); } - } else if (isNew(is)) { - // otherwise, favor the new segment - tobe.remove(j); + continue; } else { - tobe.remove(i); + // otherwise, favor the new segment + if (isNew(is) && is.equals(newSegments)) { + tobe.remove(j); + continue; + } else if (js.equals(newSegments)) { + tobe.remove(i); + continue; + } } - continue; } // if i, j in sequence @@ -865,8 +863,17 @@ public class CubeManager implements IRealizationProvider { continue; } - // seems j not fitting - tobe.remove(j); + // js can be covered by is + if (is.equals(newSegments)) { + // seems j not fitting + tobe.remove(j); + continue; + } else { + i++; + j++; + continue; + } + } return tobe; http://git-wip-us.apache.org/repos/asf/kylin/blob/334c2e09/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java index 49bb128..e63fe99 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java @@ -19,6 +19,7 @@ package org.apache.kylin.cube; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -123,6 +124,104 @@ public class CubeManagerTest extends LocalFileMetadataTestCase { } + + @Test + public void testConcurrentBuildAndMerge() throws Exception { + CubeManager mgr = CubeManager.getInstance(getTestConfig()); + CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty"); + getTestConfig().setMaxBuildingSegments(10); + // no segment at first + assertEquals(0, cube.getSegments().size()); + + // append first + CubeSegment seg1 = mgr.appendSegment(cube, 0, 0, 0, 1000); + seg1.setStatus(SegmentStatusEnum.READY); + + CubeSegment seg2 = mgr.appendSegment(cube, 0, 0, 1000, 2000); + seg2.setStatus(SegmentStatusEnum.READY); + + + CubeSegment seg3 = mgr.mergeSegments(cube, 0, 0, 0000, 2000, true); + seg3.setStatus(SegmentStatusEnum.NEW); + + + CubeSegment seg4 = mgr.appendSegment(cube, 0, 0, 2000, 3000); + seg4.setStatus(SegmentStatusEnum.NEW); + seg4.setLastBuildJobID("test"); + seg4.setStorageLocationIdentifier("test"); + + CubeSegment seg5 = mgr.appendSegment(cube, 0, 0, 3000, 4000); + seg5.setStatus(SegmentStatusEnum.READY); + + CubeUpdate cubeBuilder = new CubeUpdate(cube); + + mgr.updateCube(cubeBuilder); + + + mgr.promoteNewlyBuiltSegments(cube, seg4); + + assertTrue(cube.getSegments().size() == 5); + + assertTrue(cube.getSegmentById(seg1.getUuid()) != null && cube.getSegmentById(seg1.getUuid()).getStatus() == SegmentStatusEnum.READY); + assertTrue(cube.getSegmentById(seg2.getUuid()) != null && cube.getSegmentById(seg2.getUuid()).getStatus() == SegmentStatusEnum.READY); + assertTrue(cube.getSegmentById(seg3.getUuid()) != null && cube.getSegmentById(seg3.getUuid()).getStatus() == SegmentStatusEnum.NEW); + assertTrue(cube.getSegmentById(seg4.getUuid()) != null && cube.getSegmentById(seg4.getUuid()).getStatus() == SegmentStatusEnum.READY); + assertTrue(cube.getSegmentById(seg5.getUuid()) != null && cube.getSegmentById(seg5.getUuid()).getStatus() == SegmentStatusEnum.READY); + + } + + + @Test + public void testConcurrentMergeAndMerge() throws Exception { + CubeManager mgr = CubeManager.getInstance(getTestConfig()); + getTestConfig().setMaxBuildingSegments(10); + CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty"); + + // no segment at first + assertEquals(0, cube.getSegments().size()); + + // append first + CubeSegment seg1 = mgr.appendSegment(cube, 0, 0, 0, 1000); + seg1.setStatus(SegmentStatusEnum.READY); + + CubeSegment seg2 = mgr.appendSegment(cube, 0, 0, 1000, 2000); + seg2.setStatus(SegmentStatusEnum.READY); + + CubeSegment seg3 = mgr.appendSegment(cube, 0, 0, 2000, 3000); + seg3.setStatus(SegmentStatusEnum.READY); + + CubeSegment seg4 = mgr.appendSegment(cube, 0, 0, 3000, 4000); + seg4.setStatus(SegmentStatusEnum.READY); + + + + CubeSegment merge1 = mgr.mergeSegments(cube, 0, 0, 0, 2000, true); + merge1.setStatus(SegmentStatusEnum.NEW); + merge1.setLastBuildJobID("test"); + merge1.setStorageLocationIdentifier("test"); + + CubeSegment merge2 = mgr.mergeSegments(cube, 0, 0, 2000, 4000, true); + merge2.setStatus(SegmentStatusEnum.NEW); + merge2.setLastBuildJobID("test"); + merge2.setStorageLocationIdentifier("test"); + + CubeUpdate cubeBuilder = new CubeUpdate(cube); + mgr.updateCube(cubeBuilder); + + + mgr.promoteNewlyBuiltSegments(cube, merge1); + + assertTrue(cube.getSegments().size() == 4); + + assertTrue(cube.getSegmentById(seg1.getUuid()) == null); + assertTrue(cube.getSegmentById(seg2.getUuid()) == null); + assertTrue(cube.getSegmentById(merge1.getUuid()) != null && cube.getSegmentById(merge1.getUuid()).getStatus() == SegmentStatusEnum.READY); + assertTrue(cube.getSegmentById(seg3.getUuid()) != null && cube.getSegmentById(seg3.getUuid()).getStatus() == SegmentStatusEnum.READY); + assertTrue(cube.getSegmentById(seg4.getUuid()) != null && cube.getSegmentById(seg4.getUuid()).getStatus() == SegmentStatusEnum.READY); + assertTrue(cube.getSegmentById(merge2.getUuid()) != null && cube.getSegmentById(merge2.getUuid()).getStatus() == SegmentStatusEnum.NEW); + + } + @Test public void testGetAllCubes() throws Exception { final ResourceStore store = ResourceStore.getStore(getTestConfig());