KYLIN-1962 reorg BuildCubeWithStream test case Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/859230d7 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/859230d7 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/859230d7
Branch: refs/heads/KYLIN-1726-2 Commit: 859230d787f9c218f56e56308897b68fb23d8dc4 Parents: ab5563a Author: shaofengshi <shaofeng...@apache.org> Authored: Mon Sep 26 18:10:32 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Tue Sep 27 10:17:40 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 2 +- .../java/org/apache/kylin/cube/CubeManager.java | 20 +++ .../java/org/apache/kylin/cube/CubeSegment.java | 7 +- .../test_streaming_table_cube_desc.json | 17 ++- .../test_streaming_table_model_desc.json | 3 +- .../kylin/provision/BuildCubeWithStream.java | 121 +++++++++++------ .../kylin/provision/BuildCubeWithStream2.java | 134 ------------------- 7 files changed, 116 insertions(+), 188 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/859230d7/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 3e41055..838ef97 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -807,6 +807,6 @@ abstract public class KylinConfigBase implements Serializable { } public int getMaxBuildingSegments() { - return Integer.parseInt(getOptional("kylin.cube.building.segment.max", "1")); + return Integer.parseInt(getOptional("kylin.cube.building.segment.max", "2")); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/859230d7/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 463c8e9..962568c 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -29,10 +29,12 @@ import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinConfigExt; @@ -476,6 +478,24 @@ public class CubeManager implements IRealizationProvider { if (pair.getFirst() == false || pair.getSecond() == false) throw new IllegalArgumentException("The new refreshing segment " + newSegment + " does not match any existing segment in cube " + cube); + if (startOffset > 0 || endOffset > 0) { + CubeSegment toRefreshSeg = null; + for (CubeSegment cubeSegment : cube.getSegments()) { + if (cubeSegment.getSourceOffsetStart() == startOffset && cubeSegment.getSourceOffsetEnd() == endOffset) { + toRefreshSeg = cubeSegment; + break; + } + } + + if (toRefreshSeg == null) { + throw new IllegalArgumentException("For streaming cube, only one segment can be refreshed at one time"); + } + + Map<String, String> partitionInfo = Maps.newHashMap(); + partitionInfo.putAll(toRefreshSeg.getAdditionalInfo()); + newSegment.setAdditionalInfo(partitionInfo); + } + CubeUpdate cubeBuilder = new CubeUpdate(cube); cubeBuilder.setToAddSegs(newSegment); updateCube(cubeBuilder); http://git-wip-us.apache.org/repos/asf/kylin/blob/859230d7/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java index afb0d28..d5de47f 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java @@ -20,7 +20,6 @@ package org.apache.kylin.cube; import java.text.SimpleDateFormat; import java.util.Collection; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -106,7 +105,7 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable, ISegmen @JsonProperty("additionalInfo") @JsonInclude(JsonInclude.Include.NON_EMPTY) - private HashMap<String, String> additionalInfo = new LinkedHashMap<String, String>(); + private Map<String, String> additionalInfo = new LinkedHashMap<String, String>(); private volatile Map<Long, Short> cuboidBaseShards = Maps.newHashMap();//cuboid id ==> base(starting) shard for this cuboid @@ -543,11 +542,11 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable, ISegmen this.indexPath = indexPath; } - public HashMap<String, String> getAdditionalInfo() { + public Map<String, String> getAdditionalInfo() { return additionalInfo; } - public void setAdditionalInfo(HashMap<String, String> additionalInfo) { + public void setAdditionalInfo(Map<String, String> additionalInfo) { this.additionalInfo = additionalInfo; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/859230d7/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json index 8279417..640b91c 100644 --- a/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json +++ b/examples/test_case_data/localmeta/cube_desc/test_streaming_table_cube_desc.json @@ -1,30 +1,29 @@ { "uuid" : "901ed15e-7769-4c66-b7ae-fbdc971cd192", - "name" : "test_streaming_table_cube_desc", "description" : "", "dimensions" : [ { - "name" : "DEFAULT.STREAMING_TABLE.SITE", + "name" : "SITE", "table" : "DEFAULT.STREAMING_TABLE", "column" : "SITE", "derived" : null }, { - "name" : "DEFAULT.STREAMING_TABLE.ITM", + "name" : "ITM", "table" : "DEFAULT.STREAMING_TABLE", "column" : "ITM", "derived" : null }, { - "name" : "TIME", + "name" : "DAY_START", "table" : "DEFAULT.STREAMING_TABLE", "column" : "DAY_START", "derived" : null }, { - "name" : "TIME", + "name" : "HOUR_START", "table" : "DEFAULT.STREAMING_TABLE", "column" : "HOUR_START", "derived" : null }, { - "name" : "TIME", + "name" : "MINUTE_START", "table" : "DEFAULT.STREAMING_TABLE", "column" : "MINUTE_START", "derived" : null @@ -68,13 +67,13 @@ } ], "rowkey" : { "rowkey_columns" : [ { - "column" : "DAY_START", + "column" : "MINUTE_START", "encoding" : "dict" }, { "column" : "HOUR_START", "encoding" : "dict" }, { - "column" : "MINUTE_START", + "column" : "DAY_START", "encoding" : "dict" }, { "column" : "SITE", @@ -107,7 +106,7 @@ } ], "override_kylin_properties": { "kylin.cube.algorithm": "inmem", - "kylin.cube.building.segment.max": "3" + "kylin.cube.building.segment.max": "5" }, "notify_list" : [ ], "status_need_notify" : [ ], http://git-wip-us.apache.org/repos/asf/kylin/blob/859230d7/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json b/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json index e6977e1..23b10f7 100644 --- a/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json +++ b/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json @@ -1,5 +1,4 @@ { - "uuid": "ff527b94-f860-44c3-8452-93b177888732", "name": "test_streaming_table_model_desc", "dimensions": [ @@ -23,7 +22,7 @@ "fact_table": "DEFAULT.STREAMING_TABLE", "filter_condition": null, "partition_desc": { - "partition_date_column": "DEFAULT.STREAMING_TABLE.minute_start", + "partition_date_column": "DEFAULT.STREAMING_TABLE.MINUTE_START", "partition_date_start": 0, "partition_type": "APPEND" } http://git-wip-us.apache.org/repos/asf/kylin/blob/859230d7/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index dfcedfb..23d7ca8 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -20,12 +20,21 @@ package org.apache.kylin.provision; import java.io.File; import java.io.IOException; +import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Random; import java.util.TimeZone; import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import com.google.common.collect.Lists; import org.I0Itec.zkclient.ZkConnection; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.common.requests.MetadataResponse; @@ -47,6 +56,7 @@ import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.impl.threadpool.DefaultScheduler; import org.apache.kylin.job.manager.ExecutableManager; import org.apache.kylin.job.streaming.Kafka10DataLoader; +import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.source.kafka.KafkaConfigManager; import org.apache.kylin.source.kafka.config.BrokerConfig; import org.apache.kylin.source.kafka.config.KafkaConfig; @@ -55,6 +65,8 @@ import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.lang.Thread.sleep; + /** * for streaming cubing case "test_streaming_table" */ @@ -70,6 +82,7 @@ public class BuildCubeWithStream { private KafkaConfig kafkaConfig; private MockKafka kafkaServer; protected static boolean fastBuildMode = false; + private boolean generateData = true; public void before() throws Exception { deployEnv(); @@ -139,44 +152,91 @@ public class BuildCubeWithStream { public void build() throws Exception { clearSegment(cubeName); - SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); - f.setTimeZone(TimeZone.getTimeZone("GMT")); - long date1 = 0; - long date2 = f.parse("2013-01-01").getTime(); + new Thread(new Runnable() { + @Override + public void run() { + SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); + f.setTimeZone(TimeZone.getTimeZone("GMT")); + long dateStart = 0; + try { + dateStart = f.parse("2012-01-01").getTime(); + } catch (ParseException e) { + } + Random rand = new Random(); + while (generateData == true) { + long dateEnd = dateStart + 7 * 24 * 3600000; + try { + generateStreamData(dateStart, dateEnd, rand.nextInt(100)); + dateStart = dateEnd; + sleep(rand.nextInt(rand.nextInt(100 * 1000))); // wait random time + } catch (Exception e) { + e.printStackTrace(); + } + } + } + }).start(); + ExecutorService executorService = Executors.newCachedThreadPool(); + + List<FutureTask<ExecutableState>> futures = Lists.newArrayList(); + for (int i = 0; i < 5; i++) { + Thread.sleep(2 * 60 * 1000); // wait for new messages + FutureTask futureTask = new FutureTask(new Callable<ExecutableState>() { + @Override + public ExecutableState call() { + ExecutableState result = null; + try { + result = buildSegment(cubeName, 0, Long.MAX_VALUE); + } catch (Exception e) { + // previous build hasn't been started, or other case. + e.printStackTrace(); + } + + return result; + } + }); - int numberOfRecrods1 = 10000; - generateStreamData(date1, date2, numberOfRecrods1); - ExecutableState result = buildSegment(cubeName, 0, Long.MAX_VALUE); - Assert.assertTrue(result == ExecutableState.SUCCEED); + executorService.submit(futureTask); + futures.add(futureTask); + } - if (fastBuildMode == false) { - long date3 = f.parse("2013-04-01").getTime(); - int numberOfRecords2 = 5000; - generateStreamData(date2, date3, numberOfRecords2); - result = buildSegment(cubeName, 0, Long.MAX_VALUE); - Assert.assertTrue(result == ExecutableState.SUCCEED); + generateData = false; // stop generating message to kafka + executorService.shutdown(); + int succeedBuild = 0; + for (int i = 0; i < futures.size(); i++) { + ExecutableState result = futures.get(i).get(20, TimeUnit.MINUTES); + logger.info("Checking building task " + i + " whose state is " + result); + Assert.assertTrue(result == null || result == ExecutableState.SUCCEED || result == ExecutableState.DISCARDED ); + if (result == ExecutableState.SUCCEED) + succeedBuild++; + } + + logger.info(succeedBuild + " build jobs have been successfully completed."); + List<CubeSegment> segments = cubeManager.getCube(cubeName).getSegments(SegmentStatusEnum.READY); + Assert.assertTrue(segments.size() == succeedBuild); + + if (fastBuildMode == false) { //empty build - result = buildSegment(cubeName, 0, Long.MAX_VALUE); + ExecutableState result = buildSegment(cubeName, 0, Long.MAX_VALUE); Assert.assertTrue(result == ExecutableState.DISCARDED); + long endOffset = segments.get(segments.size() - 1).getSourceOffsetEnd(); //merge - result = mergeSegment(cubeName, 0, 15000); + result = mergeSegment(cubeName, 0, endOffset); Assert.assertTrue(result == ExecutableState.SUCCEED); - List<CubeSegment> segments = cubeManager.getCube(cubeName).getSegments(); + segments = cubeManager.getCube(cubeName).getSegments(); Assert.assertTrue(segments.size() == 1); CubeSegment toRefreshSeg = segments.get(0); - HashMap<String, String> partitionOffsetMap = toRefreshSeg.getAdditionalInfo(); - refreshSegment(cubeName, toRefreshSeg.getSourceOffsetStart(), toRefreshSeg.getSourceOffsetEnd(), partitionOffsetMap); + refreshSegment(cubeName, toRefreshSeg.getSourceOffsetStart(), toRefreshSeg.getSourceOffsetEnd()); segments = cubeManager.getCube(cubeName).getSegments(); Assert.assertTrue(segments.size() == 1); } - } + private ExecutableState mergeSegment(String cubeName, long startOffset, long endOffset) throws Exception { CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, false); DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(segment, "TEST"); @@ -185,14 +245,8 @@ public class BuildCubeWithStream { return job.getStatus(); } - private String refreshSegment(String cubeName, long startOffset, long endOffset, HashMap<String, String> partitionOffsetMap) throws Exception { + private String refreshSegment(String cubeName, long startOffset, long endOffset) throws Exception { CubeSegment segment = cubeManager.refreshSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset); - segment.setAdditionalInfo(partitionOffsetMap); - CubeInstance cubeInstance = cubeManager.getCube(cubeName); - CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance); - cubeBuilder.setToUpdateSegs(segment); - cubeManager.updateCube(cubeBuilder); - segment = cubeManager.getCube(cubeName).getSegmentById(segment.getUuid()); DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST"); jobService.addJob(job); waitForJob(job.getId()); @@ -209,8 +263,8 @@ public class BuildCubeWithStream { protected void deployEnv() throws IOException { DeployUtil.overrideJobJarLocations(); -// DeployUtil.initCliWorkDir(); -// DeployUtil.deployMetadata(); + // DeployUtil.initCliWorkDir(); + // DeployUtil.deployMetadata(); } public static void beforeClass() throws Exception { @@ -265,13 +319,4 @@ public class BuildCubeWithStream { } } - - protected int cleanupOldStorage() throws Exception { - String[] args = { "--delete", "true" }; - - // KapStorageCleanupCLI cli = new KapStorageCleanupCLI(); - // cli.execute(args); - return 0; - } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/859230d7/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java deleted file mode 100644 index d8c857f..0000000 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream2.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.provision; - -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.List; -import java.util.Random; -import java.util.TimeZone; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.FutureTask; -import java.util.concurrent.TimeUnit; - -import com.google.common.collect.Lists; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.job.execution.ExecutableState; -import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.junit.Assert; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static java.lang.Thread.sleep; - -/** - * for streaming cubing case "test_streaming_table", using multiple threads to build it concurrently. - */ -public class BuildCubeWithStream2 extends BuildCubeWithStream { - - private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStream2.class); - private boolean generateData = true; - - @Override - public void build() throws Exception { - clearSegment(cubeName); - SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); - f.setTimeZone(TimeZone.getTimeZone("GMT")); - final long date1 = 0; - final long date2 = f.parse("2013-01-01").getTime(); - - new Thread(new Runnable() { - @Override - public void run() { - - Random rand = new Random(); - while (generateData == true) { - try { - generateStreamData(date1, date2, rand.nextInt(100)); - sleep(rand.nextInt(rand.nextInt(100 * 1000))); // wait random time, from 0 to 100 seconds - } catch (IOException e) { - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - }).start(); - ExecutorService executorService = Executors.newFixedThreadPool(4); - - List<FutureTask<ExecutableState>> futures = Lists.newArrayList(); - for (int i = 0; i < 5; i++) { - Thread.sleep(2 * 60 * 1000); // sleep 2 mintues - FutureTask futureTask = new FutureTask(new Callable<ExecutableState>() { - @Override - public ExecutableState call() { - ExecutableState result = null; - try { - result = buildSegment(cubeName, 0, Long.MAX_VALUE); - } catch (Exception e) { - e.printStackTrace(); - } - - return result; - } - }); - - executorService.submit(futureTask); - futures.add(futureTask); - } - - generateData = false; // stop generating message to kafka - executorService.shutdown(); - int succeedBuild = 0; - for (int i = 0; i < futures.size(); i++) { - ExecutableState result = futures.get(i).get(20, TimeUnit.MINUTES); - logger.info("Checking building task " + i + " whose state is " + result); - Assert.assertTrue(result == null || result == ExecutableState.SUCCEED || result == ExecutableState.DISCARDED ); - if (result == ExecutableState.SUCCEED) - succeedBuild++; - } - - logger.info(succeedBuild + " build jobs have been successfully completed."); - List<CubeSegment> segments = cubeManager.getCube(cubeName).getSegments(SegmentStatusEnum.READY); - Assert.assertTrue(segments.size() == succeedBuild); - - } - - public static void main(String[] args) throws Exception { - try { - beforeClass(); - - BuildCubeWithStream2 buildCubeWithStream = new BuildCubeWithStream2(); - buildCubeWithStream.before(); - buildCubeWithStream.build(); - logger.info("Build is done"); - buildCubeWithStream.after(); - afterClass(); - logger.info("Going to exit"); - System.exit(0); - } catch (Exception e) { - logger.error("error", e); - System.exit(1); - } - - } - -}