KYLIN-2624 pass IT
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1b864f04 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1b864f04 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1b864f04 Branch: refs/heads/KYLIN-2606 Commit: 1b864f04e3fbf50c08978c2f8533ead4ca9784c6 Parents: d21c817 Author: Yang Li <liy...@apache.org> Authored: Sat May 20 14:14:43 2017 +0800 Committer: hongbin ma <m...@kyligence.io> Committed: Sat May 20 14:19:07 2017 +0800 ---------------------------------------------------------------------- examples/test_case_data/localmeta/UUID | 1 + .../kylin/job/BaseTestDistributedScheduler.java | 2 + .../hbase/ITZookeeperDistributedLockTest.java | 257 +++++++++++++++++++ .../hbase/util/ZookeeperDistributedLock.java | 11 +- .../util/ITZookeeperDistributedLockTest.java | 254 ------------------ 5 files changed, 268 insertions(+), 257 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/1b864f04/examples/test_case_data/localmeta/UUID ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/UUID b/examples/test_case_data/localmeta/UUID new file mode 100644 index 0000000..e3f5e6b --- /dev/null +++ b/examples/test_case_data/localmeta/UUID @@ -0,0 +1 @@ +$3591078f-9401-481a-a233-e7755c1142d0 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/1b864f04/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java index 6343dfa..1ea6507 100644 --- a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java +++ b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java @@ -113,6 +113,8 @@ public class BaseTestDistributedScheduler extends HBaseMetadataTestCase { @AfterClass public static void after() throws Exception { + jobLock1.purgeLocks(""); + if (scheduler1 != null) { scheduler1.shutdown(); scheduler1 = null; http://git-wip-us.apache.org/repos/asf/kylin/blob/1b864f04/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITZookeeperDistributedLockTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITZookeeperDistributedLockTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITZookeeperDistributedLockTest.java new file mode 100644 index 0000000..48d6736 --- /dev/null +++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITZookeeperDistributedLockTest.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.storage.hbase; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kylin.common.lock.DistributedLock; +import org.apache.kylin.common.lock.DistributedLock.Watcher; +import org.apache.kylin.common.util.HBaseMetadataTestCase; +import org.apache.kylin.storage.hbase.util.ZookeeperDistributedLock; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ITZookeeperDistributedLockTest extends HBaseMetadataTestCase { + private static final Logger logger = LoggerFactory.getLogger(ITZookeeperDistributedLockTest.class); + private static final String ZK_PFX = "/test/ZookeeperDistributedLockTest/" + new Random().nextInt(10000000); + + static ZookeeperDistributedLock.Factory factory; + + @BeforeClass + public static void setup() throws Exception { + staticCreateTestMetadata(); + factory = new ZookeeperDistributedLock.Factory(); + } + + @AfterClass + public static void after() throws Exception { + staticCleanupTestMetadata(); + factory.lockForCurrentProcess().purgeLocks(ZK_PFX); + } + + @Test + public void testBasic() { + DistributedLock l = factory.lockForCurrentThread(); + String path = ZK_PFX + "/testBasic"; + + assertTrue(l.isLocked(path) == false); + assertTrue(l.lock(path)); + assertTrue(l.lock(path)); + assertTrue(l.lock(path)); + assertEquals(l.getClient(), l.peekLock(path)); + assertTrue(l.isLocked(path)); + assertTrue(l.isLockedByMe(path)); + l.unlock(path); + assertTrue(l.isLocked(path) == false); + } + + @Test + public void testErrorCases() { + DistributedLock c = factory.lockForClient("client1"); + DistributedLock d = factory.lockForClient("client2"); + String path = ZK_PFX + "/testErrorCases"; + + assertTrue(c.isLocked(path) == false); + assertTrue(d.peekLock(path) == null); + + assertTrue(c.lock(path)); + assertTrue(d.lock(path) == false); + assertTrue(d.isLocked(path) == true); + assertEquals(c.getClient(), d.peekLock(path)); + + try { + d.unlock(path); + fail(); + } catch (IllegalStateException ex) { + // expected + } + + c.unlock(path); + assertTrue(d.isLocked(path) == false); + + d.lock(path); + d.unlock(path); + } + + @Test + public void testLockTimeout() throws InterruptedException { + final DistributedLock c = factory.lockForClient("client1"); + final DistributedLock d = factory.lockForClient("client2"); + final String path = ZK_PFX + "/testLockTimeout"; + + assertTrue(c.isLocked(path) == false); + assertTrue(d.peekLock(path) == null); + + assertTrue(c.lock(path)); + new Thread() { + @Override + public void run() { + d.lock(path, 15000); + } + }.start(); + c.unlock(path); + + Thread.sleep(20000); + + assertTrue(c.isLocked(path)); + assertEquals(d.getClient(), d.peekLock(path)); + d.unlock(path); + } + + @Test + public void testWatch() throws InterruptedException, IOException { + // init lock paths + final String base = ZK_PFX + "/testWatch"; + final int nLocks = 4; + final String[] lockPaths = new String[nLocks]; + for (int i = 0; i < nLocks; i++) + lockPaths[i] = base + "/" + i; + + // init clients + final int[] clientIds = new int[] { 2, 3, 5, 7, 11, 13, 17, 19, 23, 29 }; + final int nClients = clientIds.length; + final DistributedLock[] clients = new DistributedLock[nClients]; + for (int i = 0; i < nClients; i++) { + clients[i] = factory.lockForClient("" + clientIds[i]); + } + + // init watch + DistributedLock lock = factory.lockForClient(""); + final AtomicInteger lockCounter = new AtomicInteger(0); + final AtomicInteger unlockCounter = new AtomicInteger(0); + final AtomicInteger scoreCounter = new AtomicInteger(0); + Closeable watch = lock.watchLocks(base, Executors.newFixedThreadPool(1), new Watcher() { + + @Override + public void onLock(String lockPath, String client) { + lockCounter.incrementAndGet(); + int cut = lockPath.lastIndexOf("/"); + int lockId = Integer.parseInt(lockPath.substring(cut + 1)) + 1; + int clientId = Integer.parseInt(client); + scoreCounter.addAndGet(lockId * clientId); + } + + @Override + public void onUnlock(String lockPath, String client) { + unlockCounter.incrementAndGet(); + } + }); + + // init client threads + ClientThread[] threads = new ClientThread[nClients]; + for (int i = 0; i < nClients; i++) { + DistributedLock client = clients[i]; + threads[i] = new ClientThread(client, lockPaths); + threads[i].start(); + } + + // wait done + for (int i = 0; i < nClients; i++) { + threads[i].join(); + } + + Thread.sleep(3000); + + // verify counters + assertEquals(0, lockCounter.get() - unlockCounter.get()); + int clientSideScore = 0; + int clientSideLocks = 0; + for (int i = 0; i < nClients; i++) { + clientSideScore += threads[i].scoreCounter; + clientSideLocks += threads[i].lockCounter; + } + // The counters match perfectly on Windows but not on Linux, for unknown reason... + logger.info("client side locks is {} and watcher locks is {}", clientSideLocks, lockCounter.get()); + logger.info("client side score is {} and watcher score is {}", clientSideScore, scoreCounter.get()); + //assertEquals(clientSideLocks, lockCounter.get()); + //assertEquals(clientSideScore, scoreCounter.get()); + watch.close(); + + // assert all locks were released + for (int i = 0; i < nLocks; i++) { + assertTrue(lock.isLocked(lockPaths[i]) == false); + } + } + + class ClientThread extends Thread { + DistributedLock client; + String[] lockPaths; + int nLocks; + int lockCounter = 0; + int scoreCounter = 0; + + ClientThread(DistributedLock client, String[] lockPaths) { + this.client = client; + this.lockPaths = lockPaths; + this.nLocks = lockPaths.length; + } + + @Override + public void run() { + long start = System.currentTimeMillis(); + Random rand = new Random(); + + while (System.currentTimeMillis() - start <= 15000) { + try { + Thread.sleep(rand.nextInt(200)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + // random lock + int lockIdx = rand.nextInt(nLocks); + if (client.isLockedByMe(lockPaths[lockIdx]) == false) { + boolean locked = client.lock(lockPaths[lockIdx]); + if (locked) { + lockCounter++; + scoreCounter += (lockIdx + 1) * Integer.parseInt(client.getClient()); + } + } + + // random unlock + try { + lockIdx = rand.nextInt(nLocks); + client.unlock(lockPaths[lockIdx]); + } catch (IllegalStateException e) { + // ignore + } + } + + // clean up, unlock all + for (String lockPath : lockPaths) { + try { + client.unlock(lockPath); + } catch (IllegalStateException e) { + // ignore + } + } + } + }; +} http://git-wip-us.apache.org/repos/asf/kylin/blob/1b864f04/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java index ea64bbf..84a6fd0 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedLock.java @@ -102,7 +102,7 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock { public Factory(KylinConfig config) { this.curator = getZKClient(config); - this.zkPathBase = "/kylin/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix(); + this.zkPathBase = fixSlash("/kylin/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix()); } @Override @@ -284,10 +284,15 @@ public class ZookeeperDistributedLock implements DistributedLock, JobLock { if (!lockPath.startsWith(zkPathBase)) lockPath = zkPathBase + (lockPath.startsWith("/") ? "" : "/") + lockPath; - return dropDoubleSlash(lockPath); + return fixSlash(lockPath); } - public static String dropDoubleSlash(String path) { + private static String fixSlash(String path) { + if (!path.startsWith("/")) + path = "/" + path; + if (path.endsWith("/")) + path = path.substring(0, path.length() - 1); + for (int n = Integer.MAX_VALUE; n > path.length();) { n = path.length(); path = path.replace("//", "/"); http://git-wip-us.apache.org/repos/asf/kylin/blob/1b864f04/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/util/ITZookeeperDistributedLockTest.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/util/ITZookeeperDistributedLockTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/util/ITZookeeperDistributedLockTest.java deleted file mode 100644 index 1bfe8f7..0000000 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/util/ITZookeeperDistributedLockTest.java +++ /dev/null @@ -1,254 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.util; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Random; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.kylin.common.lock.DistributedLock; -import org.apache.kylin.common.lock.DistributedLock.Watcher; -import org.apache.kylin.common.util.HBaseMetadataTestCase; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ITZookeeperDistributedLockTest extends HBaseMetadataTestCase { - private static final Logger logger = LoggerFactory.getLogger(ITZookeeperDistributedLockTest.class); - private static final String ZK_PFX = "/test/ZookeeperDistributedLockTest/" + new Random().nextInt(10000000); - - static ZookeeperDistributedLock.Factory factory; - - @BeforeClass - public static void setup() throws Exception { - staticCreateTestMetadata(); - factory = new ZookeeperDistributedLock.Factory(); - } - - @AfterClass - public static void after() throws Exception { - staticCleanupTestMetadata(); - factory.lockForCurrentProcess().purgeLocks(ZK_PFX); - } - - @Test - public void testBasic() { - DistributedLock l = factory.lockForCurrentThread(); - String path = ZK_PFX + "/testBasic"; - - assertTrue(l.isLocked(path) == false); - assertTrue(l.lock(path)); - assertTrue(l.lock(path)); - assertTrue(l.lock(path)); - assertEquals(l.getClient(), l.peekLock(path)); - assertTrue(l.isLocked(path)); - assertTrue(l.isLockedByMe(path)); - l.unlock(path); - assertTrue(l.isLocked(path) == false); - } - - @Test - public void testErrorCases() { - DistributedLock c = factory.lockForClient("client1"); - DistributedLock d = factory.lockForClient("client2"); - String path = ZK_PFX + "/testErrorCases"; - - assertTrue(c.isLocked(path) == false); - assertTrue(d.peekLock(path) == null); - - assertTrue(c.lock(path)); - assertTrue(d.lock(path) == false); - assertTrue(d.isLocked(path) == true); - assertEquals(c.getClient(), d.peekLock(path)); - - try { - d.unlock(path); - fail(); - } catch (IllegalStateException ex) { - // expected - } - - c.unlock(path); - assertTrue(d.isLocked(path) == false); - - d.lock(path); - d.unlock(path); - } - - @Test - public void testLockTimeout() throws InterruptedException { - final DistributedLock c = factory.lockForClient("client1"); - final DistributedLock d = factory.lockForClient("client2"); - final String path = ZK_PFX + "/testLockTimeout"; - - assertTrue(c.isLocked(path) == false); - assertTrue(d.peekLock(path) == null); - - assertTrue(c.lock(path)); - new Thread() { - @Override - public void run() { - d.lock(path, 10000); - } - }.start(); - c.unlock(path); - - Thread.sleep(10000); - - assertTrue(c.isLocked(path)); - assertEquals(d.getClient(), d.peekLock(path)); - d.unlock(path); - } - - @Test - public void testWatch() throws InterruptedException, IOException { - // init lock paths - final String base = ZK_PFX + "/testWatch"; - final int nLocks = 4; - final String[] lockPaths = new String[nLocks]; - for (int i = 0; i < nLocks; i++) - lockPaths[i] = base + "/" + i; - - // init clients - final int[] clientIds = new int[] { 2, 3, 5, 7, 11, 13, 17, 19, 23, 29 }; - final int nClients = clientIds.length; - final DistributedLock[] clients = new DistributedLock[nClients]; - for (int i = 0; i < nClients; i++) { - clients[i] = factory.lockForClient("" + clientIds[i]); - } - - // init watch - DistributedLock lock = factory.lockForClient(""); - final AtomicInteger lockCounter = new AtomicInteger(0); - final AtomicInteger unlockCounter = new AtomicInteger(0); - final AtomicInteger scoreCounter = new AtomicInteger(0); - Closeable watch = lock.watchLocks(base, Executors.newFixedThreadPool(1), new Watcher() { - - @Override - public void onLock(String lockPath, String client) { - lockCounter.incrementAndGet(); - int cut = lockPath.lastIndexOf("/"); - int lockId = Integer.parseInt(lockPath.substring(cut + 1)) + 1; - int clientId = Integer.parseInt(client); - scoreCounter.addAndGet(lockId * clientId); - } - - @Override - public void onUnlock(String lockPath, String client) { - unlockCounter.incrementAndGet(); - } - }); - - // init client threads - ClientThread[] threads = new ClientThread[nClients]; - for (int i = 0; i < nClients; i++) { - DistributedLock client = clients[i]; - threads[i] = new ClientThread(client, lockPaths); - threads[i].start(); - } - - // wait done - for (int i = 0; i < nClients; i++) { - threads[i].join(); - } - - // verify counters - assertEquals(0, lockCounter.get() - unlockCounter.get()); - int clientSideScore = 0; - int clientSideLocks = 0; - for (int i = 0; i < nClients; i++) { - clientSideScore += threads[i].scoreCounter; - clientSideLocks += threads[i].lockCounter; - } - // The counters match perfectly on Windows but not on Linux, for unknown reason... - logger.info("client side locks is {} and watcher locks is {}", clientSideLocks, lockCounter.get()); - logger.info("client side score is {} and watcher score is {}", clientSideScore, scoreCounter.get()); - //assertEquals(clientSideLocks, lockCounter.get()); - //assertEquals(clientSideScore, scoreCounter.get()); - watch.close(); - - // assert all locks were released - for (int i = 0; i < nLocks; i++) { - assertTrue(lock.isLocked(lockPaths[i]) == false); - } - } - - class ClientThread extends Thread { - DistributedLock client; - String[] lockPaths; - int nLocks; - int lockCounter = 0; - int scoreCounter = 0; - - ClientThread(DistributedLock client, String[] lockPaths) { - this.client = client; - this.lockPaths = lockPaths; - this.nLocks = lockPaths.length; - } - - @Override - public void run() { - long start = System.currentTimeMillis(); - Random rand = new Random(); - - while (System.currentTimeMillis() - start <= 15000) { - try { - Thread.sleep(rand.nextInt(200)); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - // random lock - int lockIdx = rand.nextInt(nLocks); - if (client.isLockedByMe(lockPaths[lockIdx]) == false) { - boolean locked = client.lock(lockPaths[lockIdx]); - if (locked) { - lockCounter++; - scoreCounter += (lockIdx + 1) * Integer.parseInt(client.getClient()); - } - } - - // random unlock - try { - lockIdx = rand.nextInt(nLocks); - client.unlock(lockPaths[lockIdx]); - } catch (IllegalStateException e) { - // ignore - } - } - - // clean up, unlock all - for (String lockPath : lockPaths) { - try { - client.unlock(lockPath); - } catch (IllegalStateException e) { - // ignore - } - } - } - }; -}