This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 24042e2 KYLIN-3270: add integration test for optimize job 24042e2 is described below commit 24042e2209d85b0c8de98a86d9a573aff182d9c9 Author: Zhong <nju_y...@apache.org> AuthorDate: Thu Mar 1 20:08:01 2018 +0800 KYLIN-3270: add integration test for optimize job Signed-off-by: shaofengshi <shaofeng...@apache.org> --- .../java/org/apache/kylin/cube/CubeManager.java | 16 ++-- .../java/org/apache/kylin/cube/model/CubeDesc.java | 72 ++++++++-------- .../kylin/engine/mr/steps/CopyDictionaryStep.java | 2 +- .../kylin/engine/spark/SparkCubingByLayer.java | 3 +- .../kylin/provision/BuildCubeWithEngine.java | 97 ++++++++++++++++++---- 5 files changed, 132 insertions(+), 58 deletions(-) diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index dc370e2..15bb676 100755 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -348,6 +348,10 @@ public class CubeManager implements IRealizationProvider { cube.setCuboids(update.getCuboids()); } + if (update.getCuboidsRecommend() != null) { + cube.setCuboidsRecommend(update.getCuboidsRecommend()); + } + try { cube = crud.save(cube); } catch (IllegalStateException ise) { @@ -637,19 +641,21 @@ public class CubeManager implements IRealizationProvider { } public CubeSegment[] optimizeSegments(CubeInstance cube, Set<Long> cuboidsRecommend) throws IOException { - checkReadyForOptimize(cube); + CubeInstance cubeCopy = cube.latestCopyForWrite(); // get a latest copy - List<CubeSegment> readySegments = cube.getSegments(SegmentStatusEnum.READY); + checkReadyForOptimize(cubeCopy); + + List<CubeSegment> readySegments = cubeCopy.getSegments(SegmentStatusEnum.READY); CubeSegment[] optimizeSegments = new CubeSegment[readySegments.size()]; int i = 0; for (CubeSegment segment : readySegments) { - CubeSegment newSegment = newSegment(cube, segment.getTSRange(), null); - validateNewSegments(cube, newSegment); + CubeSegment newSegment = newSegment(cubeCopy, segment.getTSRange(), null); + validateNewSegments(cubeCopy, newSegment); optimizeSegments[i++] = newSegment; } - CubeUpdate update = new CubeUpdate(cube); + CubeUpdate update = new CubeUpdate(cubeCopy); update.setCuboidsRecommend(cuboidsRecommend); update.setToAddSegs(optimizeSegments); updateCube(update); diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index dbd8708..93c327d 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -18,28 +18,16 @@ package org.apache.kylin.cube.model; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - -import java.lang.reflect.Method; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.BitSet; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeSet; - +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.commons.codec.binary.Base64; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.ArrayUtils; @@ -77,16 +65,27 @@ import org.apache.kylin.metadata.realization.RealizationType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.google.common.base.Joiner; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; +import java.lang.reflect.Method; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeSet; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; /** */ @@ -677,12 +676,18 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { } private void initMandatoryCuboids() { + this.mandatoryCuboids.clear(); + this.mandatoryCuboids.addAll(generateMandatoryCuboids(this.mandatoryDimensionSetList)); + } + + public Set<Long> generateMandatoryCuboids(List<Set<String>> mandatoryDimensionSetList) { Map<String, RowKeyColDesc> rowKeyColDescMap = Maps.newHashMap(); for (RowKeyColDesc entry : getRowkey().getRowKeyColumns()) { rowKeyColDescMap.put(entry.getColumn(), entry); } - for (Set<String> mandatoryDimensionSet : this.mandatoryDimensionSetList) { + Set<Long> mandatoryCuboids = Sets.newHashSetWithExpectedSize(mandatoryDimensionSetList.size()); + for (Set<String> mandatoryDimensionSet : mandatoryDimensionSetList) { long cuboid = 0L; for (String columnName : mandatoryDimensionSet) { TblColRef tblColRef = model.findColumn(columnName); @@ -697,6 +702,7 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { } mandatoryCuboids.add(cuboid); } + return mandatoryCuboids; } public CuboidScheduler getInitialCuboidScheduler() { diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java index c0b3c99..dbabc12 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java @@ -44,7 +44,7 @@ public class CopyDictionaryStep extends AbstractExecutable { @Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { final CubeManager mgr = CubeManager.getInstance(context.getConfig()); - final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams())); + final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams())).latestCopyForWrite(); final CubeSegment optimizeSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); CubeSegment oldSegment = optimizeSegment.getCubeInstance().getOriginalSegmentToOptimize(optimizeSegment); diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java index 0d26815..714991d 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java @@ -45,7 +45,6 @@ import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.common.RowKeySplitter; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.cuboid.CuboidScheduler; -import org.apache.kylin.cube.cuboid.CuboidUtil; import org.apache.kylin.cube.kv.AbstractRowKeyEncoder; import org.apache.kylin.cube.kv.RowKeyEncoderProvider; import org.apache.kylin.cube.model.CubeDesc; @@ -184,7 +183,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa reducerFunction2 = new CuboidReducerFunction2(cubeName, metaUrl, sConf, needAggr); } - final int totalLevels = CuboidUtil.getLongestDepth(cubeSegment.getCuboidScheduler().getAllCuboidIds()); + final int totalLevels = cubeSegment.getCuboidScheduler().getBuildLevel(); JavaPairRDD<ByteArray, Object[]>[] allRDDs = new JavaPairRDD[totalLevels + 1]; int level = 0; int partition = estimateRDDPartitionNum(level, cubeStatsReader, envConfig); diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java index ce7b892..afd9788 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java @@ -18,20 +18,9 @@ package org.apache.kylin.provision; -import java.io.File; -import java.io.IOException; -import java.lang.reflect.Method; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.List; -import java.util.Map; -import java.util.TimeZone; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -45,12 +34,15 @@ import org.apache.kylin.cube.CubeDescManager; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.CuboidScheduler; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.EngineFactory; +import org.apache.kylin.engine.mr.BatchOptimizeJobCheckpointBuilder; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.job.DeployUtil; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.CheckpointExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; @@ -63,7 +55,21 @@ import org.apache.kylin.storage.hbase.util.ZookeeperJobLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Method; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.TimeZone; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; public class BuildCubeWithEngine { @@ -171,7 +177,8 @@ public class BuildCubeWithEngine { } cubeManager = CubeManager.getInstance(kylinConfig); for (String jobId : jobService.getAllJobIds()) { - if (jobService.getJob(jobId) instanceof CubingJob) { + AbstractExecutable executable = jobService.getJob(jobId); + if (executable instanceof CubingJob || executable instanceof CheckpointExecutable) { jobService.deleteJob(jobId); } } @@ -228,7 +235,7 @@ public class BuildCubeWithEngine { for (int i = 0; i < tasks.size(); ++i) { Future<Boolean> task = tasks.get(i); final Boolean result = task.get(); - if (result == false) { + if (!result) { throw new RuntimeException("The test '" + testCase[i] + "' is failed."); } } @@ -299,6 +306,8 @@ public class BuildCubeWithEngine { return false; if (!buildSegment(cubeName, date2, date3)) return false; + if (!optimizeCube(cubeName)) + return false; if (!buildSegment(cubeName, date3, date4)) return false; if (!buildSegment(cubeName, date4, date5)) // one empty segment @@ -339,6 +348,24 @@ public class BuildCubeWithEngine { cubeManager.updateCubeDropSegments(cube, cube.getSegments()); } + private Boolean optimizeCube(String cubeName) throws Exception { + CubeInstance cubeInstance = cubeManager.getCube(cubeName); + Set<Long> cuboidsRecommend = mockRecommendCuboids(cubeInstance, 0.05, 255); + CubeSegment[] optimizeSegments = cubeManager.optimizeSegments(cubeInstance, cuboidsRecommend); + List<AbstractExecutable> optimizeJobList = Lists.newArrayListWithExpectedSize(optimizeSegments.length); + for (CubeSegment optimizeSegment : optimizeSegments) { + DefaultChainedExecutable optimizeJob = EngineFactory.createBatchOptimizeJob(optimizeSegment, "TEST"); + jobService.addJob(optimizeJob); + optimizeJobList.add(optimizeJob); + optimizeSegment.setLastBuildJobID(optimizeJob.getId()); + } + CheckpointExecutable checkpointJob = new BatchOptimizeJobCheckpointBuilder(cubeInstance, "TEST").build(); + checkpointJob.addTaskListForCheck(optimizeJobList); + jobService.addJob(checkpointJob); + ExecutableState state = waitForJob(checkpointJob.getId()); + return Boolean.valueOf(ExecutableState.SUCCEED == state); + } + private Boolean mergeSegment(String cubeName, long startDate, long endDate) throws Exception { CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), new TSRange(startDate, endDate), null, true); DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(segment, "TEST"); @@ -356,6 +383,42 @@ public class BuildCubeWithEngine { return Boolean.valueOf(ExecutableState.SUCCEED == state); } + private Set<Long> mockRecommendCuboids(CubeInstance cubeInstance, double maxRatio, int maxNumber) { + Preconditions.checkArgument(maxRatio > 0.0 && maxRatio < 1.0); + Preconditions.checkArgument(maxNumber > 0); + Set<Long> cuboidsRecommend; + Random rnd = new Random(); + + // add some mandatory cuboids which are for other unit test + // - org.apache.kylin.query.ITCombinationTest.testLimitEnabled + // - org.apache.kylin.query.ITFailfastQueryTest.testPartitionNotExceedMaxScanBytes + // - org.apache.kylin.query.ITFailfastQueryTest.testQueryNotExceedMaxScanBytes + List<Set<String>> mandatoryDimensionSetList = Lists.newLinkedList(); + mandatoryDimensionSetList.add(Sets.newHashSet("CAL_DT")); + mandatoryDimensionSetList.add(Sets.newHashSet("seller_id", "CAL_DT")); + mandatoryDimensionSetList.add(Sets.newHashSet("LSTG_FORMAT_NAME", "slr_segment_cd")); + Set<Long> mandatoryCuboids = cubeInstance.getDescriptor().generateMandatoryCuboids(mandatoryDimensionSetList); + + CuboidScheduler cuboidScheduler = cubeInstance.getCuboidScheduler(); + Set<Long> cuboidsCurrent = cuboidScheduler.getAllCuboidIds(); + long baseCuboid = cuboidScheduler.getBaseCuboidId(); + do { + cuboidsRecommend = Sets.newHashSet(); + cuboidsRecommend.add(baseCuboid); + cuboidsRecommend.addAll(mandatoryCuboids); + for (long i = 1; i < baseCuboid; i++) { + if (rnd.nextDouble() < maxRatio) { // add 5% cuboids + cuboidsRecommend.add(i); + } + if (cuboidsRecommend.size() > maxNumber) { + break; + } + } + } while (cuboidsRecommend.equals(cuboidsCurrent)); + + return cuboidsRecommend; + } + @SuppressWarnings("unused") private int cleanupOldStorage() throws Exception { String[] args = { "--delete", "true" }; -- To stop receiving notification emails like this one, please contact shaofeng...@apache.org.