half way
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/52d0bc17 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/52d0bc17 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/52d0bc17 Branch: refs/heads/KYLIN-2033 Commit: 52d0bc1703d89b68548005b071e4da9bb25234ed Parents: 478066d Author: Yang Li <liy...@apache.org> Authored: Thu Sep 22 21:03:39 2016 +0800 Committer: Yang Li <liy...@apache.org> Committed: Thu Sep 22 21:04:05 2016 +0800 ---------------------------------------------------------------------- .../kylin/metadata/project/ProjectManager.java | 3 -- .../kylin/storage/hybrid/HybridManager.java | 54 ++++++++++++-------- .../engine/streaming/StreamingManager.java | 14 ++--- .../kylin/source/kafka/KafkaConfigManager.java | 18 ++++--- 4 files changed, 51 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/52d0bc17/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java index 972d40f..be69df3 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java @@ -31,9 +31,6 @@ import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.Serializer; import org.apache.kylin.common.restclient.Broadcaster; import org.apache.kylin.common.restclient.Broadcaster.Event; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.common.restclient.CaseInsensitiveStringCache; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.badquery.BadQueryHistoryManager; http://git-wip-us.apache.org/repos/asf/kylin/blob/52d0bc17/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java index f772777..d73a1a9 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java @@ -29,8 +29,9 @@ import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.Serializer; import org.apache.kylin.common.restclient.Broadcaster; import org.apache.kylin.common.restclient.Broadcaster.Event; -import org.apache.kylin.cube.CubeManager.SyncListener; import org.apache.kylin.common.restclient.CaseInsensitiveStringCache; +import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.metadata.project.ProjectManager; import org.apache.kylin.metadata.project.RealizationEntry; import org.apache.kylin.metadata.realization.IRealization; import org.apache.kylin.metadata.realization.IRealizationProvider; @@ -85,42 +86,49 @@ public class HybridManager implements IRealizationProvider { private HybridManager(KylinConfig config) throws IOException { logger.info("Initializing HybridManager with config " + config); this.config = config; - this.hybridMap = new CaseInsensitiveStringCache<HybridInstance>(config, "hybrid"); - Broadcaster.getInstance(config).registerListener(new SyncListener(), "hybrid", "cube"); - loadAllHybridInstance(); + this.hybridMap = new CaseInsensitiveStringCache<HybridInstance>(config, "hybrid", new SyncListener()); + reloadAllHybridInstance(); } - private class SyncListener implements Broadcaster.Listener { + private class SyncListener extends Broadcaster.Listener { + @Override - public void clearAll() { - // TODO Auto-generated method stub - + public void onClearAll(Broadcaster broadcaster) throws IOException { + clearCache(); } @Override - public void notify(String entity, Event event, String cacheKey) { - if (event == Event.CREATE || event == Event.UPDATE) { - switch (entity) { - case "hybrid": - loadAllHybridInstance(); - break; - case "cube": - reloadHybridInstanceByChild(RealizationType.CUBE, cacheKey); - break; + public void onProjectSchemaChange(Broadcaster broadcaster, String project) throws IOException { + for (IRealization real : ProjectManager.getInstance(config).listAllRealizations(project)) { + if (real instanceof HybridInstance) { + reloadHybridInstance(real.getName()); } } + } + + @Override + public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException { + String hybridName = cacheKey; + + if (event == Event.DROP) + hybridMap.removeLocal(hybridName); + else + reloadHybridInstance(hybridName); + for (ProjectInstance prj : ProjectManager.getInstance(config).findProjects(RealizationType.HYBRID, hybridName)) { + broadcaster.notifyProjectSchemaUpdate(prj.getName()); + } } } - private void loadAllHybridInstance() throws IOException { + private void reloadAllHybridInstance() throws IOException { ResourceStore store = getStore(); List<String> paths = store.collectResourceRecursively(ResourceStore.HYBRID_RESOURCE_ROOT, ".json"); logger.debug("Loading Hybrid from folder " + store.getReadableResourcePath(ResourceStore.HYBRID_RESOURCE_ROOT)); for (String path : paths) { - loadHybridInstance(path); + reloadHybridInstanceAt(path); } logger.debug("Loaded " + paths.size() + " Hybrid(s)"); @@ -137,11 +145,15 @@ public class HybridManager implements IRealizationProvider { } if (includes == true) - loadHybridInstance(HybridInstance.concatResourcePath(hybridInstance.getName())); + reloadHybridInstance(hybridInstance.getName()); } } - private synchronized HybridInstance loadHybridInstance(String path) { + public void reloadHybridInstance(String name) { + reloadHybridInstanceAt(HybridInstance.concatResourcePath(name)); + } + + private synchronized HybridInstance reloadHybridInstanceAt(String path) { ResourceStore store = getStore(); HybridInstance hybridInstance = null; http://git-wip-us.apache.org/repos/asf/kylin/blob/52d0bc17/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java index 87dd5d5..5a3f104 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java @@ -77,18 +77,18 @@ public class StreamingManager { reloadAllStreaming(); } - private class SyncListener implements Broadcaster.Listener { + private class SyncListener extends Broadcaster.Listener { @Override - public void clearAll() { - // TODO Auto-generated method stub - + public void onClearAll(Broadcaster broadcaster) throws IOException { + clearCache(); } @Override - public void notify(String entity, Event event, String cacheKey) throws IOException { - if (event == Event.CREATE || event == Event.UPDATE) { + public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException { + if (event == Event.DROP) + removeStreamingLocal(cacheKey); + else reloadStreamingConfigLocal(cacheKey); - } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/52d0bc17/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java index a3b675b..8b982e2 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java @@ -78,18 +78,18 @@ public class KafkaConfigManager { reloadAllKafkaConfig(); } - private class SyncListener implements Broadcaster.Listener { + private class SyncListener extends Broadcaster.Listener { @Override - public void clearAll() { - // TODO Auto-generated method stub - + public void onClearAll(Broadcaster broadcaster) throws IOException { + clearCache(); } @Override - public void notify(String entity, Event event, String cacheKey) throws IOException { - if (event == Event.CREATE || event == Event.UPDATE) { + public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException { + if (event == Event.DROP) + removeKafkaConfigLocal(cacheKey); + else reloadKafkaConfigLocal(cacheKey); - } } } @@ -215,6 +215,10 @@ public class KafkaConfigManager { kafkaMap.remove(kafkaConfig.getName()); } + private void removeKafkaConfigLocal(String name) { + kafkaMap.remove(name); + } + private void reloadAllKafkaConfig() throws IOException { ResourceStore store = getStore(); logger.info("Reloading Kafka Metadata from folder " + store.getReadableResourcePath(ResourceStore.KAFKA_RESOURCE_ROOT));