This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch 2.5.x in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 542720e4f6433ea21495e0256fc04ecfe3b03ba8 Author: nichunen <chunen...@kyligence.io> AuthorDate: Tue Oct 23 20:46:58 2018 +0800 KYLIN-3633 Avoid potential dead lock when building global dictionary --- .../org/apache/kylin/dict/DictionaryGenerator.java | 47 +++++++++++++-- .../apache/kylin/dict/GlobalDictionaryBuilder.java | 8 ++- .../org/apache/kylin/dict/IDictionaryBuilder.java | 3 + .../dict/global/SegmentAppendTrieDictBuilder.java | 5 ++ .../kylin/dict/ITGlobalDictionaryBuilderTest.java | 68 ++++++++++++++++++---- 5 files changed, 113 insertions(+), 18 deletions(-) diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java index db0c302..7c33b4a 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java @@ -75,13 +75,19 @@ public class DictionaryGenerator { builder.init(dictInfo, baseId, null); // add values - while (valueEnumerator.moveNext()) { - String value = valueEnumerator.current(); + try { + while (valueEnumerator.moveNext()) { + String value = valueEnumerator.current(); - boolean accept = builder.addValue(value); + boolean accept = builder.addValue(value); - if (accept && samples.size() < nSamples && samples.contains(value) == false) - samples.add(value); + if (accept && samples.size() < nSamples && samples.contains(value) == false) + samples.add(value); + } + } catch (IOException e) { + logger.error("Error during adding dict value.", e); + builder.clear(); + throw e; } // build @@ -149,6 +155,12 @@ public class DictionaryGenerator { return new DateStrDictionary(datePattern, baseId); } + + + @Override + public void clear() { + // do nothing + } } private static class TimeDictBuilder implements IDictionaryBuilder { @@ -171,6 +183,11 @@ public class DictionaryGenerator { public Dictionary<String> build() throws IOException { return new TimeStrDictionary(); // base ID is always 0 } + + @Override + public void clear() { + + } } private static class StringTrieDictBuilder implements IDictionaryBuilder { @@ -196,6 +213,11 @@ public class DictionaryGenerator { public Dictionary<String> build() throws IOException { return builder.build(baseId); } + + @Override + public void clear() { + + } } private static class StringTrieDictForestBuilder implements IDictionaryBuilder { @@ -219,6 +241,11 @@ public class DictionaryGenerator { public Dictionary<String> build() throws IOException { return builder.build(); } + + @Override + public void clear() { + + } } @SuppressWarnings("deprecation") @@ -245,6 +272,11 @@ public class DictionaryGenerator { public Dictionary<String> build() throws IOException { return builder.build(baseId); } + + @Override + public void clear() { + + } } private static class NumberTrieDictForestBuilder implements IDictionaryBuilder { @@ -268,6 +300,11 @@ public class DictionaryGenerator { public Dictionary<String> build() throws IOException { return builder.build(); } + + @Override + public void clear() { + + } } } diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java index fd7affd..00410e7 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java @@ -19,7 +19,6 @@ package org.apache.kylin.dict; import java.io.IOException; - import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.lock.DistributedLock; import org.apache.kylin.common.util.Dictionary; @@ -100,6 +99,13 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder { return new AppendTrieDictionary<>(); } + @Override + public void clear() { + if (lock.isLocked(getLockPath(sourceColumn))) { + lock.unlock(getLockPath(sourceColumn)); + } + } + private String getLockPath(String pathName) { return "/dict/" + pathName + "/lock"; } diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java index e2a643d..771bfb4 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java @@ -35,4 +35,7 @@ public interface IDictionaryBuilder { /** Build the dictionary */ Dictionary<String> build() throws IOException; + + /** Clear before exit */ + void clear(); } diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/global/SegmentAppendTrieDictBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/global/SegmentAppendTrieDictBuilder.java index 05dd5d2..c5b61b5 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/global/SegmentAppendTrieDictBuilder.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/global/SegmentAppendTrieDictBuilder.java @@ -75,4 +75,9 @@ public class SegmentAppendTrieDictBuilder implements IDictionaryBuilder { public Dictionary<String> build() throws IOException { return builder.build(baseId); } + + @Override + public void clear() { + + } } diff --git a/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java index c578a57..94c4f56 100644 --- a/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java @@ -18,20 +18,21 @@ package org.apache.kylin.dict; +import java.io.IOException; +import java.util.concurrent.CountDownLatch; + import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.lock.DistributedLock; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.HBaseMetadataTestCase; import org.apache.kylin.common.util.HadoopUtil; import org.junit.After; +import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; - -import java.io.IOException; -import java.util.concurrent.CountDownLatch; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; +import org.junit.rules.ExpectedException; public class ITGlobalDictionaryBuilderTest extends HBaseMetadataTestCase { private DictionaryInfo dictionaryInfo; @@ -48,8 +49,12 @@ public class ITGlobalDictionaryBuilderTest extends HBaseMetadataTestCase { staticCleanupTestMetadata(); } + @Rule + public ExpectedException thrown = ExpectedException.none(); + private void cleanup() { - String BASE_DIR = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/GlobalDict" + dictionaryInfo.getResourceDir() + "/"; + String BASE_DIR = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/GlobalDict" + + dictionaryInfo.getResourceDir() + "/"; Path basePath = new Path(BASE_DIR); try { HadoopUtil.getFileSystem(basePath).delete(basePath, true); @@ -77,16 +82,33 @@ public class ITGlobalDictionaryBuilderTest extends HBaseMetadataTestCase { Dictionary<String> dict = builder.build(); for (int i = 0; i < 10000; i++) { - assertNotEquals(-1, dict.getIdFromValue("t1_" + i)); + Assert.assertNotEquals(-1, dict.getIdFromValue("t1_" + i)); } for (int i = 0; i < 10; i++) { - assertNotEquals(-1, dict.getIdFromValue("t2_" + i)); + Assert.assertNotEquals(-1, dict.getIdFromValue("t2_" + i)); } for (int i = 0; i < 100000; i++) { - assertNotEquals(-1, dict.getIdFromValue("t3_" + i)); + Assert.assertNotEquals(-1, dict.getIdFromValue("t3_" + i)); } - assertEquals(110011, dict.getIdFromValue("success")); + Assert.assertEquals(110011, dict.getIdFromValue("success")); + } + + @Test + public void testBuildGlobalDictFailed() throws IOException { + thrown.expect(IOException.class); + thrown.expectMessage("read failed."); + + GlobalDictionaryBuilder builder = new GlobalDictionaryBuilder(); + try { + DictionaryGenerator.buildDictionary(builder, dictionaryInfo, new ErrorDictionaryValueEnumerator()); + } catch (Throwable e) { + DistributedLock lock = KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentThread(); + String lockPath = "/dict/" + dictionaryInfo.getSourceTable() + "_" + dictionaryInfo.getSourceColumn() + + "/lock"; + Assert.assertFalse(lock.isLocked(lockPath)); + throw e; + } } private class SharedBuilderThread extends Thread { @@ -118,4 +140,26 @@ public class ITGlobalDictionaryBuilderTest extends HBaseMetadataTestCase { } } } -} + + private class ErrorDictionaryValueEnumerator implements IDictionaryValueEnumerator { + private int idx = 0; + + @Override + public String current() throws IOException { + return null; + } + + @Override + public boolean moveNext() throws IOException { + idx++; + if (idx == 1) + throw new IOException("read failed."); + return true; + } + + @Override + public void close() throws IOException { + + } + } +} \ No newline at end of file