fix some testcases which can not be run concurrently Signed-off-by: lidongsjtu <lid...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f2e8b690 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f2e8b690 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f2e8b690 Branch: refs/heads/KYLIN-2428 Commit: f2e8b690f186abe5048dee8a6b0339c2a28c0594 Parents: 3ee4946 Author: etherge <ethe...@163.com> Authored: Wed Feb 8 23:50:35 2017 +0800 Committer: lidongsjtu <lid...@apache.org> Committed: Thu Feb 9 22:05:11 2017 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/job/DeployUtil.java | 16 +++--- .../apache/kylin/common/KylinConfigBase.java | 7 ++- .../apache/kylin/common/KylinConfigTest.java | 17 ++++++- .../apache/kylin/dict/CachedTreeMapTest.java | 7 ++- .../kylin/job/BaseTestDistributedScheduler.java | 29 ++--------- .../kylin/provision/BuildCubeWithStream.java | 32 ++++++++++-- .../org/apache/kylin/provision/MockKafka.java | 11 ++--- .../hbase/util/ZookeeperDistributedJobLock.java | 24 +-------- .../kylin/storage/hbase/util/ZookeeperUtil.java | 52 ++++++++++++++++++++ 9 files changed, 124 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/f2e8b690/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java index e8c7fae..fdcd52c 100644 --- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java +++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java @@ -56,6 +56,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; +import com.google.common.io.Files; public class DeployUtil { private static final Logger logger = LoggerFactory.getLogger(DeployUtil.class); @@ -139,7 +140,7 @@ public class DeployUtil { boolean buildCubeUsingProvidedData = Boolean.parseBoolean(System.getProperty("buildCubeUsingProvidedData")); if (!buildCubeUsingProvidedData) { System.out.println("build cube with random dataset"); - + // data is generated according to cube descriptor and saved in resource store MetadataManager mgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()); DataModelDesc model = mgr.getDataModelDesc(modelName); @@ -208,12 +209,12 @@ public class DeployUtil { MetadataManager metaMgr = MetadataManager.getInstance(config()); // scp data files, use the data from hbase, instead of local files - File temp = File.createTempFile("temp", ".csv"); - temp.createNewFile(); + File tempDir = Files.createTempDir(); + String tempDirAbsPath = tempDir.getAbsolutePath(); for (String tablename : TABLE_NAMES) { tablename = tablename.toUpperCase(); - File localBufferFile = new File(temp.getParent() + "/" + tablename + ".csv"); + File localBufferFile = new File(tempDirAbsPath + "/" + tablename + ".csv"); localBufferFile.createNewFile(); InputStream hbaseDataStream = metaMgr.getStore().getResource("/data/" + tablename + ".csv").inputStream; @@ -225,8 +226,7 @@ public class DeployUtil { localBufferFile.deleteOnExit(); } - String tableFileDir = temp.getParent(); - temp.delete(); + tempDir.deleteOnExit(); IHiveClient hiveClient = HiveClientFactory.getHiveClient(); // create hive tables @@ -238,7 +238,7 @@ public class DeployUtil { // load data to hive tables // LOAD DATA LOCAL INPATH 'filepath' [OVERWRITE] INTO TABLE tablename for (String tablename : TABLE_NAMES) { - hiveClient.executeHQL(generateLoadDataHql(tablename.toUpperCase(), tableFileDir)); + hiveClient.executeHQL(generateLoadDataHql(tablename.toUpperCase(), tempDirAbsPath)); } final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); @@ -255,7 +255,7 @@ public class DeployUtil { String dropsql = "DROP TABLE IF EXISTS " + tableDesc.getIdentity(); String dropsql2 = "DROP VIEW IF EXISTS " + tableDesc.getIdentity(); - + StringBuilder ddl = new StringBuilder(); ddl.append("CREATE TABLE " + tableDesc.getIdentity() + "\n"); ddl.append("(" + "\n"); http://git-wip-us.apache.org/repos/asf/kylin/blob/f2e8b690/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index ebd9dfc..dce4149 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -216,12 +216,11 @@ abstract public class KylinConfigBase implements Serializable { String metadataUrl = getMetadataUrl(); String defaultPrefix = "kylin_metadata"; - if (metadataUrl.endsWith("@hbase")) { - int cut = metadataUrl.lastIndexOf('@'); + int cut = metadataUrl.lastIndexOf('@'); + if (cut > 0) { return metadataUrl.substring(0, cut); - } else { - return defaultPrefix; } + return defaultPrefix; } public String[] getRealizationProviders() { http://git-wip-us.apache.org/repos/asf/kylin/blob/f2e8b690/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java ---------------------------------------------------------------------- diff --git a/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java b/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java index 4d5f130..7e4b444 100644 --- a/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java +++ b/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java @@ -31,7 +31,7 @@ import org.junit.Test; import com.google.common.collect.Maps; -public class KylinConfigTest extends HotLoadKylinPropertiesTestCase{ +public class KylinConfigTest extends HotLoadKylinPropertiesTestCase { @Test public void testMRConfigOverride() { KylinConfig config = KylinConfig.getInstanceFromEnv(); @@ -81,4 +81,19 @@ public class KylinConfigTest extends HotLoadKylinPropertiesTestCase{ assertEquals("ky...@kylin.apache.org", config.getKylinOwner()); } + + @Test + public void testGetMetadataUrlPrefix() { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + final String default_metadata_prefix = "kylin_metadata"; + + config.setMetadataUrl("testMetaPrefix@hbase"); + assertEquals("testMetaPrefix", config.getMetadataUrlPrefix()); + + config.setMetadataUrl("testMetaPrefix@hdfs"); + assertEquals("testMetaPrefix", config.getMetadataUrlPrefix()); + + config.setMetadataUrl("/kylin/temp"); + assertEquals(default_metadata_prefix, config.getMetadataUrlPrefix()); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/f2e8b690/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java index ccf6e24..3c29d9c 100644 --- a/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java +++ b/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java @@ -30,6 +30,7 @@ import java.io.DataOutputStream; import java.io.File; import java.io.FileFilter; import java.io.IOException; +import java.util.UUID; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -114,8 +115,10 @@ public class CachedTreeMapTest { } } - public static final String baseDir = "/tmp/kylin_cachedtreemap_test/"; - public static final String workingDir = "/tmp/kylin_cachedtreemap_test/working"; + + static final UUID uuid = UUID.randomUUID(); + static final String baseDir = "/tmp/kylin_cachedtreemap_test/" + uuid; + static final String workingDir = baseDir + "/working"; private static void cleanup() { Path basePath = new Path(baseDir); http://git-wip-us.apache.org/repos/asf/kylin/blob/f2e8b690/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java index 2f37a50..2d79970 100644 --- a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java +++ b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java @@ -20,9 +20,7 @@ package org.apache.kylin.job; import java.io.File; import java.nio.charset.Charset; -import java.util.Arrays; - -import javax.annotation.Nullable; +import java.util.UUID; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; @@ -31,8 +29,6 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.HBaseMetadataTestCase; import org.apache.kylin.job.engine.JobEngineConfig; @@ -40,15 +36,13 @@ import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.impl.threadpool.DistributedScheduler; -import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock; +import org.apache.kylin.storage.hbase.util.ZookeeperUtil; import org.junit.AfterClass; import org.junit.BeforeClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Function; -import com.google.common.collect.Iterables; import com.google.common.io.Files; public class BaseTestDistributedScheduler extends HBaseMetadataTestCase { @@ -62,8 +56,8 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase { static File localMetaDir; static final String SEGMENT_ID = "segmentId"; - static final String segmentId1 = "segmentId1"; - static final String segmentId2 = "segmentId2"; + static final String segmentId1 = "seg1" + UUID.randomUUID(); + static final String segmentId2 = "seg2" + UUID.randomUUID(); static final String serverName1 = "serverName1"; static final String serverName2 = "serverName2"; static final String confDstPath1 = "target/kylin_metadata_dist_lock_test1/kylin.properties"; @@ -177,7 +171,7 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase { } private static void initZk() { - String zkConnectString = getZKConnectString(); + String zkConnectString = ZookeeperUtil.getZKConnectString(); if (StringUtils.isEmpty(zkConnectString)) { throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!"); } @@ -186,19 +180,6 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase { zkClient.start(); } - private static String getZKConnectString() { - Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); - final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM); - final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT); - return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() { - @Nullable - @Override - public String apply(String input) { - return input + ":" + port; - } - }), ","); - } - String getServerName(String cubeName) { String lockPath = getLockPath(cubeName); String serverName = null; http://git-wip-us.apache.org/repos/asf/kylin/blob/f2e8b690/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index 53c89cf..f3b1ec9 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -36,6 +36,10 @@ import java.util.concurrent.TimeUnit; import org.I0Itec.zkclient.ZkConnection; import org.apache.commons.lang3.StringUtils; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ClassUtil; @@ -63,6 +67,7 @@ import org.apache.kylin.source.kafka.KafkaConfigManager; import org.apache.kylin.source.kafka.config.BrokerConfig; import org.apache.kylin.source.kafka.config.KafkaConfig; import org.apache.kylin.storage.hbase.util.ZookeeperJobLock; +import org.apache.kylin.storage.hbase.util.ZookeeperUtil; import org.apache.kylin.tool.StorageCleanupJob; import org.junit.Assert; import org.slf4j.Logger; @@ -84,6 +89,9 @@ public class BuildCubeWithStream { private KafkaConfig kafkaConfig; private MockKafka kafkaServer; + private ZkConnection zkConnection; + private final String kafkaZkPath = "/" + UUID.randomUUID().toString(); + protected static boolean fastBuildMode = false; private boolean generateData = true; @@ -128,8 +136,9 @@ public class BuildCubeWithStream { private void startEmbeddedKafka(String topicName, BrokerConfig brokerConfig) { //Start mock Kakfa - String zkConnectionStr = "sandbox:2181"; - ZkConnection zkConnection = new ZkConnection(zkConnectionStr); + String zkConnectionStr = ZookeeperUtil.getZKConnectString() + kafkaZkPath; + System.out.println("zkConnectionStr" + zkConnectionStr); + zkConnection = new ZkConnection(zkConnectionStr); // Assert.assertEquals(ZooKeeper.States.CONNECTED, zkConnection.getZookeeperState()); kafkaServer = new MockKafka(zkConnection, brokerConfig.getPort(), brokerConfig.getId()); kafkaServer.start(); @@ -287,9 +296,24 @@ public class BuildCubeWithStream { public void after() { kafkaServer.stop(); + cleanKafkaZkPath(kafkaZkPath); DefaultScheduler.destroyInstance(); } + private void cleanKafkaZkPath(String path) { + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); + CuratorFramework zkClient = CuratorFrameworkFactory.newClient(ZookeeperUtil.getZKConnectString(), retryPolicy); + zkClient.start(); + + try { + zkClient.delete().deletingChildrenIfNeeded().forPath(kafkaZkPath); + } catch (Exception e) { + logger.warn("Failed to delete zookeeper path: " + path, e); + } finally { + zkClient.close(); + } + } + protected void waitForJob(String jobId) { while (true) { AbstractExecutable job = jobService.getJob(jobId); @@ -327,6 +351,8 @@ public class BuildCubeWithStream { buildCubeWithStream.before(); buildCubeWithStream.build(); logger.info("Build is done"); + + buildCubeWithStream.after(); buildCubeWithStream.cleanup(); logger.info("Going to exit"); } catch (Throwable e) { @@ -336,7 +362,7 @@ public class BuildCubeWithStream { long millis = System.currentTimeMillis() - start; System.out.println("Time elapsed: " + (millis / 1000) + " sec - in " + BuildCubeWithStream.class.getName()); - + System.exit(exitCode); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/f2e8b690/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java b/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java index 3f47923..fce422a 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java @@ -29,6 +29,8 @@ import org.I0Itec.zkclient.ZkConnection; import org.I0Itec.zkclient.exception.ZkMarshallingError; import org.I0Itec.zkclient.serialize.ZkSerializer; import org.apache.kafka.common.requests.MetadataResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import kafka.admin.AdminUtils; import kafka.server.KafkaConfig; @@ -52,6 +54,7 @@ public class MockKafka { } private KafkaServerStartable kafkaServer; + private static final Logger logger = LoggerFactory.getLogger(MockKafka.class); private ZkConnection zkConnection; @@ -67,7 +70,7 @@ public class MockKafka { public MockKafka(ZkConnection zkServerConnection, int port, int brokerId) { this(zkServerConnection, System.getProperty("java.io.tmpdir") + "/" + UUID.randomUUID().toString(), String.valueOf(port), String.valueOf(brokerId)); - start(); + //start(); } private MockKafka(ZkConnection zkServerConnection, String logDir, String port, String brokerId) { @@ -110,13 +113,9 @@ public class MockKafka { zkClient.close(); } - public String getConnectionString() { - return String.format("%s:%d", kafkaServer.serverConfig().hostName(), kafkaServer.serverConfig().port()); - } - public void start() { kafkaServer.startup(); - System.out.println("embedded kafka is up"); + System.out.println("--embedded kafka is up"); } public void stop() { http://git-wip-us.apache.org/repos/asf/kylin/blob/f2e8b690/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java index ee7cd50..983bfd9 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java @@ -19,11 +19,8 @@ package org.apache.kylin.storage.hbase.util; import java.nio.charset.Charset; -import java.util.Arrays; import java.util.concurrent.ExecutorService; -import javax.annotation.Nullable; - import org.apache.commons.lang3.StringUtils; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; @@ -33,18 +30,12 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.job.lock.DistributedJobLock; -import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.zookeeper.CreateMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Function; -import com.google.common.collect.Iterables; - /** * the jobLock is specially used to support distributed scheduler. */ @@ -65,7 +56,7 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock { public ZookeeperDistributedJobLock(KylinConfig config) { this.config = config; - String zkConnectString = getZKConnectString(); + String zkConnectString = ZookeeperUtil.getZKConnectString(); logger.info("zk connection string:" + zkConnectString); if (StringUtils.isEmpty(zkConnectString)) { throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!"); @@ -243,19 +234,6 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock { } } - private static String getZKConnectString() { - Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); - final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM); - final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT); - return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() { - @Nullable - @Override - public String apply(String input) { - return input + ":" + port; - } - }), ","); - } - private String getLockPath(String pathName) { return ZOOKEEPER_LOCK_PATH + "/" + config.getMetadataUrlPrefix() + "/" + pathName; } http://git-wip-us.apache.org/repos/asf/kylin/blob/f2e8b690/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java new file mode 100644 index 0000000..b5ebe89 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperUtil.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hbase.util; + +import java.util.Arrays; + +import javax.annotation.Nullable; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.kylin.storage.hbase.HBaseConnection; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; + +public class ZookeeperUtil { + + /** + * Get zookeeper connection string from HBase Configuration + * + * @return Zookeeper Connection string + */ + public static String getZKConnectString() { + Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); + final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM); + final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT); + return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() { + @Nullable + @Override + public String apply(String input) { + return input + ":" + port; + } + }), ","); + } +}