stablize BuildCubeWithStream
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0c6441c7 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0c6441c7 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0c6441c7 Branch: refs/heads/yang22-cdh5.7 Commit: 0c6441c710ddefaee6285d41f4eb20e06533c9f8 Parents: 5095596 Author: shaofengshi <shaofeng...@apache.org> Authored: Wed Mar 15 12:02:52 2017 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Wed Mar 15 12:02:52 2017 +0800 ---------------------------------------------------------------------- .../kylin/provision/BuildCubeWithStream.java | 76 ++++++++++++++++---- .../kylin/storage/hbase/util/ZookeeperUtil.java | 52 ++++++++++++++ 2 files changed, 115 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/0c6441c7/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index 8abb84c..030b7d6 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -36,6 +36,10 @@ import java.util.concurrent.TimeUnit; import org.I0Itec.zkclient.ZkConnection; import org.apache.commons.lang3.StringUtils; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ClassUtil; @@ -63,6 +67,7 @@ import org.apache.kylin.source.kafka.KafkaConfigManager; import org.apache.kylin.source.kafka.config.BrokerConfig; import org.apache.kylin.source.kafka.config.KafkaConfig; import org.apache.kylin.storage.hbase.util.ZookeeperJobLock; +import org.apache.kylin.storage.hbase.util.ZookeeperUtil; import org.apache.kylin.tool.StorageCleanupJob; import org.junit.Assert; import org.slf4j.Logger; @@ -84,8 +89,14 @@ public class BuildCubeWithStream { private KafkaConfig kafkaConfig; private MockKafka kafkaServer; + private ZkConnection zkConnection; + private final String kafkaZkPath = "/" + UUID.randomUUID().toString(); + protected static boolean fastBuildMode = false; - private boolean generateData = true; + private volatile boolean generateData = true; + private volatile boolean generateDataDone = false; + + private static final int BUILD_ROUND = 5; public void before() throws Exception { deployEnv(); @@ -126,8 +137,9 @@ public class BuildCubeWithStream { private void startEmbeddedKafka(String topicName, BrokerConfig brokerConfig) { //Start mock Kakfa - String zkConnectionStr = "sandbox:2181"; - ZkConnection zkConnection = new ZkConnection(zkConnectionStr); + String zkConnectionStr = ZookeeperUtil.getZKConnectString() + kafkaZkPath; + System.out.println("zkConnectionStr" + zkConnectionStr); + zkConnection = new ZkConnection(zkConnectionStr); // Assert.assertEquals(ZooKeeper.States.CONNECTED, zkConnection.getZookeeperState()); kafkaServer = new MockKafka(zkConnection, brokerConfig.getPort(), brokerConfig.getId()); kafkaServer.start(); @@ -171,18 +183,33 @@ public class BuildCubeWithStream { try { generateStreamData(dateStart, dateEnd, rand.nextInt(100)); dateStart = dateEnd; - sleep(rand.nextInt(rand.nextInt(100 * 1000))); // wait random time + sleep(rand.nextInt(rand.nextInt(30)) * 1000); // wait random time } catch (Exception e) { e.printStackTrace(); } } + generateDataDone = true; } }).start(); ExecutorService executorService = Executors.newCachedThreadPool(); List<FutureTask<ExecutableState>> futures = Lists.newArrayList(); - for (int i = 0; i < 5; i++) { - Thread.sleep(2 * 60 * 1000); // wait for new messages + for (int i = 0; i < BUILD_ROUND; i++) { + if (i == (BUILD_ROUND - 1)) { + // stop generating message to kafka + generateData = false; + int waittime = 0; + while (generateDataDone == false && waittime < 100) { + Thread.sleep(1000); + waittime++; + } + if (generateDataDone == false) { + throw new IllegalStateException("Timeout when wait all messages be sent to Kafka"); // ensure all messages have been flushed. + } + } else { + Thread.sleep(30 * 1000); // wait for new messages + } + FutureTask futureTask = new FutureTask(new Callable<ExecutableState>() { @Override public ExecutableState call() { @@ -202,7 +229,7 @@ public class BuildCubeWithStream { futures.add(futureTask); } - generateData = false; // stop generating message to kafka + generateData = false; executorService.shutdown(); int succeedBuild = 0; for (int i = 0; i < futures.size(); i++) { @@ -265,8 +292,8 @@ public class BuildCubeWithStream { protected void deployEnv() throws IOException { DeployUtil.overrideJobJarLocations(); - // DeployUtil.initCliWorkDir(); - // DeployUtil.deployMetadata(); + // DeployUtil.initCliWorkDir(); + // DeployUtil.deployMetadata(); } public static void beforeClass() throws Exception { @@ -274,16 +301,31 @@ public class BuildCubeWithStream { ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA); if (StringUtils.isEmpty(System.getProperty("hdp.version"))) { - throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2"); + throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.4.0.0-169"); } HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA); } public void after() { kafkaServer.stop(); + cleanKafkaZkPath(kafkaZkPath); DefaultScheduler.destroyInstance(); } + private void cleanKafkaZkPath(String path) { + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); + CuratorFramework zkClient = CuratorFrameworkFactory.newClient(ZookeeperUtil.getZKConnectString(), retryPolicy); + zkClient.start(); + + try { + zkClient.delete().deletingChildrenIfNeeded().forPath(kafkaZkPath); + } catch (Exception e) { + logger.warn("Failed to delete zookeeper path: " + path, e); + } finally { + zkClient.close(); + } + } + protected void waitForJob(String jobId) { while (true) { AbstractExecutable job = jobService.getJob(jobId); @@ -311,6 +353,9 @@ public class BuildCubeWithStream { } public static void main(String[] args) throws Exception { + long start = System.currentTimeMillis(); + int exitCode = 0; + BuildCubeWithStream buildCubeWithStream = null; try { beforeClass(); @@ -318,13 +363,18 @@ public class BuildCubeWithStream { buildCubeWithStream.before(); buildCubeWithStream.build(); logger.info("Build is done"); + + buildCubeWithStream.after(); buildCubeWithStream.cleanup(); logger.info("Going to exit"); - System.exit(0); } catch (Throwable e) { logger.error("error", e); - System.exit(1); + exitCode = 1; } + long millis = System.currentTimeMillis() - start; + System.out.println("Time elapsed: " + (millis / 1000) + " sec - in " + BuildCubeWithStream.class.getName()); + + System.exit(exitCode); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/0c6441c7/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java new file mode 100644 index 0000000..b5ebe89 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hbase.util; + +import java.util.Arrays; + +import javax.annotation.Nullable; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.kylin.storage.hbase.HBaseConnection; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; + +public class ZookeeperUtil { + + /** + * Get zookeeper connection string from HBase Configuration + * + * @return Zookeeper Connection string + */ + public static String getZKConnectString() { + Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); + final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM); + final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT); + return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() { + @Nullable + @Override + public String apply(String input) { + return input + ":" + port; + } + }), ","); + } +}