KYLIN-2506 Add distributed lock for GlobalDictionaryBuilder
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0018fafc Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0018fafc Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0018fafc Branch: refs/heads/KYLIN-2506 Commit: 0018fafc7005f9b72dfff9ba4ae53054e3ff63ad Parents: 6416df1 Author: kangkaisen <kangkai...@163.com> Authored: Mon Apr 10 16:47:46 2017 +0800 Committer: kangkaisen <kangkai...@163.com> Committed: Fri Apr 14 14:02:29 2017 +0800 ---------------------------------------------------------------------- .../kylin/dict/AppendTrieDictionaryBuilder.java | 12 +- .../apache/kylin/dict/GlobalDictHDFSStore.java | 18 +-- .../org/apache/kylin/dict/GlobalDictStore.java | 6 +- .../kylin/dict/GlobalDictionaryBuilder.java | 103 ++++++++++++++- .../kylin/dict/AppendTrieDictionaryTest.java | 76 +----------- .../dict/ITGlobalDictionaryBuilderTest.java | 124 +++++++++++++++++++ 6 files changed, 235 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/0018fafc/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java index c35a815..efa681b 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java @@ -20,8 +20,6 @@ package org.apache.kylin.dict; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.BytesUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.List; @@ -30,7 +28,6 @@ import java.util.TreeMap; import static com.google.common.base.Preconditions.checkState; public class AppendTrieDictionaryBuilder { - private static final Logger logger = LoggerFactory.getLogger(AppendTrieDictionaryBuilder.class); private final String baseDir; private final String workingDir; @@ -42,7 +39,6 @@ public class AppendTrieDictionaryBuilder { private int nValues; private BytesConverter bytesConverter; private TreeMap<DictSliceKey, String> sliceFileMap = new TreeMap<>(); // slice key -> slice file name - private int counter; private DictSliceKey curKey; private DictNode curNode; @@ -77,11 +73,7 @@ public class AppendTrieDictionaryBuilder { } @SuppressWarnings("unchecked") - public void addValue(String value) { - if (counter++ > 0 && counter % 1_000_000 == 0) { - logger.info("processed {} values", counter); - } - + public void addValue(String value) throws IOException { byte[] valueBytes = bytesConverter.convertToBytes(value); if (sliceFileMap.isEmpty()) { @@ -134,7 +126,7 @@ public class AppendTrieDictionaryBuilder { return dict; } - private void flushCurrentNode() { + private void flushCurrentNode() throws IOException { String newSliceFile = store.writeSlice(workingDir, curKey, curNode); String oldSliceFile = sliceFileMap.put(curKey, newSliceFile); if (oldSliceFile != null) { http://git-wip-us.apache.org/repos/asf/kylin/blob/0018fafc/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java index d9030d3..7cf5591 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java @@ -175,18 +175,16 @@ public class GlobalDictHDFSStore extends GlobalDictStore { } @Override - DictSlice readSlice(String directory, String sliceFileName) { + DictSlice readSlice(String directory, String sliceFileName) throws IOException { Path path = new Path(directory, sliceFileName); logger.info("read slice from {}", path); try (FSDataInputStream input = fileSystem.open(path, BUFFER_SIZE)) { return DictSlice.deserializeFrom(input); - } catch (IOException e) { - throw new RuntimeException(String.format("read slice %s failed", path), e); } } @Override - String writeSlice(String workingDir, DictSliceKey key, DictNode slice) { + String writeSlice(String workingDir, DictSliceKey key, DictNode slice) throws IOException { //write new slice String sliceFile = IndexFormatV2.sliceFileName(key); Path path = new Path(workingDir, sliceFile); @@ -195,22 +193,16 @@ public class GlobalDictHDFSStore extends GlobalDictStore { try (FSDataOutputStream out = fileSystem.create(path, true, BUFFER_SIZE)) { byte[] bytes = slice.buildTrieBytes(); out.write(bytes); - } catch (IOException e) { - throw new RuntimeException(String.format("write slice with key %s into file %s failed", key, path), e); } return sliceFile; } @Override - void deleteSlice(String workingDir, String sliceFileName) { + void deleteSlice(String workingDir, String sliceFileName) throws IOException { Path path = new Path(workingDir, sliceFileName); logger.info("delete slice at {}", path); - try { - if (fileSystem.exists(path)) { - fileSystem.delete(path, false); - } - } catch (IOException e) { - throw new RuntimeException(String.format("delete slice at %s failed", path), e); + if (fileSystem.exists(path)) { + fileSystem.delete(path, false); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/0018fafc/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java index 5817868..6a7a20c 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java @@ -63,7 +63,7 @@ public abstract class GlobalDictStore { * @return a <i>DictSlice</i> * @throws IOException on I/O error */ - abstract DictSlice readSlice(String workingDir, String sliceFileName); + abstract DictSlice readSlice(String workingDir, String sliceFileName) throws IOException; /** * Write a slice with the given key to the specified directory. @@ -73,7 +73,7 @@ public abstract class GlobalDictStore { * @return file name of the new written slice * @throws IOException on I/O error */ - abstract String writeSlice(String workingDir, DictSliceKey key, DictNode slice); + abstract String writeSlice(String workingDir, DictSliceKey key, DictNode slice) throws IOException; /** * Delete a slice with the specified file name. @@ -81,7 +81,7 @@ public abstract class GlobalDictStore { * @param sliceFileName file name of the slice, should exist * @throws IOException on I/O error */ - abstract void deleteSlice(String workingDir, String sliceFileName); + abstract void deleteSlice(String workingDir, String sliceFileName) throws IOException; /** * commit the <i>DictSlice</i> and <i>GlobalDictMetadata</i> in workingDir to new versionDir http://git-wip-us.apache.org/repos/asf/kylin/blob/0018fafc/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java index 7921980..9d66b12 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java @@ -19,9 +19,17 @@ package org.apache.kylin.dict; import java.io.IOException; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.lock.DistributedJobLock; +import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.Dictionary; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * GlobalDictinary based on whole cube, to ensure one value has same dict id in different segments. @@ -29,8 +37,16 @@ import org.apache.kylin.common.util.Dictionary; * Created by sunyerui on 16/5/24. */ public class GlobalDictionaryBuilder implements IDictionaryBuilder { - AppendTrieDictionaryBuilder builder; - int baseId; + private AppendTrieDictionaryBuilder builder; + private int baseId; + + private DistributedJobLock lock; + private String sourceColumn; + //the job thread name is UUID+threadID + private final String jobUUID = Thread.currentThread().getName(); + private int counter; + + private static Logger logger = LoggerFactory.getLogger(GlobalDictionaryBuilder.class); @Override public void init(DictionaryInfo dictInfo, int baseId) throws IOException { @@ -38,6 +54,9 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder { throw new IllegalArgumentException("GlobalDictinaryBuilder must used with an existing DictionaryInfo"); } + sourceColumn = dictInfo.getSourceTable() + "_" + dictInfo.getSourceColumn(); + lock(sourceColumn); + int maxEntriesPerSlice = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize(); this.builder = new AppendTrieDictionaryBuilder(dictInfo.getResourceDir(), maxEntriesPerSlice); this.baseId = baseId; @@ -45,14 +64,88 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder { @Override public boolean addValue(String value) { - if (value == null) + if (++counter % 1_000_000 == 0) { + if (lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) { + logger.info("processed {} values", counter); + } else { + throw new RuntimeException("Failed to create global dictionary on " + sourceColumn + " This client doesn't keep the lock"); + } + } + + if (value == null) { return false; - builder.addValue(value); + } + + try { + builder.addValue(value); + } catch (Throwable e) { + checkAndUnlock(); + throw new RuntimeException(String.format("Failed to create global dictionary on %s ", sourceColumn), e); + } + return true; } @Override public Dictionary<String> build() throws IOException { - return builder.build(baseId); + try { + if (lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) { + return builder.build(baseId); + } + } finally { + checkAndUnlock(); + } + return new AppendTrieDictionary<>(); + } + + private void lock(final String sourceColumn) throws IOException { + lock = (DistributedJobLock) ClassUtil.newInstance("org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock"); + + if (!lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) { + logger.info("{} will wait the lock for {} ", jobUUID, sourceColumn); + + final BlockingQueue<String> bq = new ArrayBlockingQueue<String>(1); + + PathChildrenCache cache = lock.watch(getWatchPath(sourceColumn), MoreExecutors.sameThreadExecutor(), new DistributedJobLock.WatcherProcess() { + @Override + public void process(String path, String data) { + if (!data.equalsIgnoreCase(jobUUID) && lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) { + try { + bq.put("getLock"); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + }); + + long start = System.currentTimeMillis(); + + try { + bq.take(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + cache.close(); + } + + logger.info("{} has waited the lock {} ms for {} ", jobUUID, (System.currentTimeMillis() - start), sourceColumn); + } + } + + private void checkAndUnlock() { + if (lock.lockWithClient(getLockPath(sourceColumn), jobUUID)) { + lock.unlock(getLockPath(sourceColumn)); + } + } + + private static final String GLOBAL_DICT_LOCK_PATH = "/kylin/dict/lock"; + + private String getLockPath(String pathName) { + return GLOBAL_DICT_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName + "/lock"; + } + + private String getWatchPath(String pathName) { + return GLOBAL_DICT_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/0018fafc/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java index 9da5071..e863901 100644 --- a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java +++ b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java @@ -22,7 +22,6 @@ import static org.apache.kylin.dict.GlobalDictHDFSStore.V2_INDEX_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.BufferedReader; @@ -44,8 +43,6 @@ import java.util.Map; import java.util.Random; import java.util.TreeMap; import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -58,18 +55,14 @@ import org.junit.Ignore; import org.junit.Test; public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase { - - private static final UUID uuid = UUID.randomUUID(); - private static final String RESOURCE_DIR = "/dict/append_dict_test/" + uuid; - private static final String HDFS_DIR = "file:///tmp/kylin_append_dict"; + private static final String RESOURCE_DIR = "/dict/append_dict_test/" + UUID.randomUUID(); private static String BASE_DIR; - private static String LOCAL_BASE_DIR = "/tmp/kylin_append_dict/kylin_metadata/resources/GlobalDict" + RESOURCE_DIR + "/"; + private static String LOCAL_BASE_DIR = "/tmp/kylin/kylin_metadata/resources/GlobalDict" + RESOURCE_DIR + "/"; @Before public void beforeTest() { staticCreateTestMetadata(); KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "50000"); - KylinConfig.getInstanceFromEnv().setProperty("kylin.env.hdfs-working-dir", HDFS_DIR); BASE_DIR = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/GlobalDict" + RESOURCE_DIR + "/"; } @@ -80,7 +73,7 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase { } private void cleanup() { - Path basePath = new Path(HDFS_DIR); + Path basePath = new Path(BASE_DIR); try { HadoopUtil.getFileSystem(basePath).delete(basePath, true); } catch (IOException e) { @@ -318,69 +311,6 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase { dictionary.getMaxId(); } - private class SharedBuilderThread extends Thread { - CountDownLatch startLatch; - CountDownLatch finishLatch; - String prefix; - int count; - - SharedBuilderThread(CountDownLatch startLatch, CountDownLatch finishLatch, String prefix, int count) { - this.startLatch = startLatch; - this.finishLatch = finishLatch; - this.prefix = prefix; - this.count = count; - } - - @Override - public void run() { - try { - AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR); - startLatch.countDown(); - for (int i = 0; i < count; i++) { - builder.addValue(prefix + i); - } - builder.build(0); - finishLatch.countDown(); - } catch (IOException e) { - } - } - } - - @Ignore - @Test - public void testSharedBuilder() throws IOException, InterruptedException { - final CountDownLatch startLatch = new CountDownLatch(3); - final CountDownLatch finishLatch = new CountDownLatch(3); - - AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR); - Thread t1 = new SharedBuilderThread(startLatch, finishLatch, "t1_", 10000); - Thread t2 = new SharedBuilderThread(startLatch, finishLatch, "t2_", 10); - Thread t3 = new SharedBuilderThread(startLatch, finishLatch, "t3_", 100000); - t1.start(); - t2.start(); - t3.start(); - startLatch.await(); - AppendTrieDictionary dict = builder.build(0); - assertTrue("AppendDictBuilder Thread too slow", finishLatch.await(3000, TimeUnit.MILLISECONDS)); - assertEquals(110010, dict.getMaxId()); - - builder = createBuilder(RESOURCE_DIR); - builder.addValue("success"); - builder.addValue("s"); - dict = builder.build(0); - for (int i = 0; i < 10000; i++) { - assertNotEquals(-1, dict.getIdFromValue("t1_" + i)); - } - for (int i = 0; i < 10; i++) { - assertNotEquals(-1, dict.getIdFromValue("t2_" + i)); - } - for (int i = 0; i < 100000; i++) { - assertNotEquals(-1, dict.getIdFromValue("t3_" + i)); - } - assertEquals(110011, dict.getIdFromValue("success")); - assertEquals(110012, dict.getIdFromValue("s")); - } - @Test public void testSplitContainSuperLongValue() throws IOException { String superLongValue = "%5Cx1A%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%7E%29%5CxEF%5CxBF%5CxBD%5Cx1B+%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5Cx13%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5B"; http://git-wip-us.apache.org/repos/asf/kylin/blob/0018fafc/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java new file mode 100644 index 0000000..4afaccd --- /dev/null +++ b/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.dict; + +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.common.util.HBaseMetadataTestCase; +import org.apache.kylin.common.util.HadoopUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +/** + * Created by kangkaisen on 2017/4/10. + */ +public class ITGlobalDictionaryBuilderTest extends HBaseMetadataTestCase { + private DictionaryInfo dictionaryInfo; + + @Before + public void beforeTest() throws Exception { + staticCreateTestMetadata(); + dictionaryInfo = new DictionaryInfo("testTable", "testColumn", 0, "String", null); + } + + @After + public void afterTest() { + cleanup(); + staticCleanupTestMetadata(); + } + + private void cleanup() { + String BASE_DIR = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/GlobalDict" + dictionaryInfo.getResourceDir() + "/"; + Path basePath = new Path(BASE_DIR); + try { + HadoopUtil.getFileSystem(basePath).delete(basePath, true); + } catch (IOException e) { + } + } + + @Test + public void testGlobalDictLock() throws IOException, InterruptedException { + final CountDownLatch startLatch = new CountDownLatch(3); + final CountDownLatch finishLatch = new CountDownLatch(3); + + Thread t1 = new SharedBuilderThread(startLatch, finishLatch, "t1_", 10000); + Thread t2 = new SharedBuilderThread(startLatch, finishLatch, "t2_", 10); + Thread t3 = new SharedBuilderThread(startLatch, finishLatch, "t3_", 100000); + t1.start(); + t2.start(); + t3.start(); + startLatch.await(); + finishLatch.await(); + + GlobalDictionaryBuilder builder = new GlobalDictionaryBuilder(); + builder.init(dictionaryInfo, 0); + builder.addValue("success"); + Dictionary<String> dict = builder.build(); + + for (int i = 0; i < 10000; i++) { + assertNotEquals(-1, dict.getIdFromValue("t1_" + i)); + } + for (int i = 0; i < 10; i++) { + assertNotEquals(-1, dict.getIdFromValue("t2_" + i)); + } + for (int i = 0; i < 100000; i++) { + assertNotEquals(-1, dict.getIdFromValue("t3_" + i)); + } + + assertEquals(110011, dict.getIdFromValue("success")); + } + + private class SharedBuilderThread extends Thread { + CountDownLatch startLatch; + CountDownLatch finishLatch; + String prefix; + int count; + + SharedBuilderThread(CountDownLatch startLatch, CountDownLatch finishLatch, String prefix, int count) { + this.startLatch = startLatch; + this.finishLatch = finishLatch; + this.prefix = prefix; + this.count = count; + } + + @Override + public void run() { + try { + GlobalDictionaryBuilder builder = new GlobalDictionaryBuilder(); + startLatch.countDown(); + + builder.init(dictionaryInfo, 0); + for (int i = 0; i < count; i++) { + builder.addValue(prefix + i); + } + builder.build(); + finishLatch.countDown(); + } catch (IOException e) { + } + } + } +}