half way

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/52d0bc17
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/52d0bc17
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/52d0bc17

Branch: refs/heads/KYLIN-2033
Commit: 52d0bc1703d89b68548005b071e4da9bb25234ed
Parents: 478066d
Author: Yang Li <liy...@apache.org>
Authored: Thu Sep 22 21:03:39 2016 +0800
Committer: Yang Li <liy...@apache.org>
Committed: Thu Sep 22 21:04:05 2016 +0800

----------------------------------------------------------------------
 .../kylin/metadata/project/ProjectManager.java  |  3 --
 .../kylin/storage/hybrid/HybridManager.java     | 54 ++++++++++++--------
 .../engine/streaming/StreamingManager.java      | 14 ++---
 .../kylin/source/kafka/KafkaConfigManager.java  | 18 ++++---
 4 files changed, 51 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/52d0bc17/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
index 972d40f..be69df3 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
@@ -31,9 +31,6 @@ import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.restclient.Broadcaster;
 import org.apache.kylin.common.restclient.Broadcaster.Event;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.badquery.BadQueryHistoryManager;

http://git-wip-us.apache.org/repos/asf/kylin/blob/52d0bc17/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
----------------------------------------------------------------------
diff --git 
a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java 
b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
index f772777..d73a1a9 100644
--- 
a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
+++ 
b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
@@ -29,8 +29,9 @@ import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.restclient.Broadcaster;
 import org.apache.kylin.common.restclient.Broadcaster.Event;
-import org.apache.kylin.cube.CubeManager.SyncListener;
 import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.project.RealizationEntry;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.IRealizationProvider;
@@ -85,42 +86,49 @@ public class HybridManager implements IRealizationProvider {
     private HybridManager(KylinConfig config) throws IOException {
         logger.info("Initializing HybridManager with config " + config);
         this.config = config;
-        this.hybridMap = new 
CaseInsensitiveStringCache<HybridInstance>(config, "hybrid");
-        Broadcaster.getInstance(config).registerListener(new SyncListener(), 
"hybrid", "cube");
-        loadAllHybridInstance();
+        this.hybridMap = new 
CaseInsensitiveStringCache<HybridInstance>(config, "hybrid", new 
SyncListener());
+        reloadAllHybridInstance();
     }
 
-    private class SyncListener implements Broadcaster.Listener {
+    private class SyncListener extends Broadcaster.Listener {
+        
         @Override
-        public void clearAll() {
-            // TODO Auto-generated method stub
-            
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            clearCache();
         }
 
         @Override
-        public void notify(String entity, Event event, String cacheKey) {
-            if (event == Event.CREATE || event == Event.UPDATE) {
-                switch (entity) {
-                case "hybrid":
-                    loadAllHybridInstance();
-                    break;
-                case "cube":
-                    reloadHybridInstanceByChild(RealizationType.CUBE, 
cacheKey);
-                    break;
+        public void onProjectSchemaChange(Broadcaster broadcaster, String 
project) throws IOException {
+            for (IRealization real : 
ProjectManager.getInstance(config).listAllRealizations(project)) {
+                if (real instanceof HybridInstance) {
+                    reloadHybridInstance(real.getName());
                 }
             }
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, 
Event event, String cacheKey) throws IOException {
+            String hybridName = cacheKey;
+            
+            if (event == Event.DROP)
+                hybridMap.removeLocal(hybridName);
+            else
+                reloadHybridInstance(hybridName);
             
+            for (ProjectInstance prj : 
ProjectManager.getInstance(config).findProjects(RealizationType.HYBRID, 
hybridName)) {
+                broadcaster.notifyProjectSchemaUpdate(prj.getName());
+            }
         }
     }
 
-    private void loadAllHybridInstance() throws IOException {
+    private void reloadAllHybridInstance() throws IOException {
         ResourceStore store = getStore();
         List<String> paths = 
store.collectResourceRecursively(ResourceStore.HYBRID_RESOURCE_ROOT, ".json");
 
         logger.debug("Loading Hybrid from folder " + 
store.getReadableResourcePath(ResourceStore.HYBRID_RESOURCE_ROOT));
 
         for (String path : paths) {
-            loadHybridInstance(path);
+            reloadHybridInstanceAt(path);
         }
 
         logger.debug("Loaded " + paths.size() + " Hybrid(s)");
@@ -137,11 +145,15 @@ public class HybridManager implements 
IRealizationProvider {
             }
 
             if (includes == true)
-                
loadHybridInstance(HybridInstance.concatResourcePath(hybridInstance.getName()));
+                reloadHybridInstance(hybridInstance.getName());
         }
     }
 
-    private synchronized HybridInstance loadHybridInstance(String path) {
+    public void reloadHybridInstance(String name) {
+        reloadHybridInstanceAt(HybridInstance.concatResourcePath(name));
+    }
+    
+    private synchronized HybridInstance reloadHybridInstanceAt(String path) {
         ResourceStore store = getStore();
 
         HybridInstance hybridInstance = null;

http://git-wip-us.apache.org/repos/asf/kylin/blob/52d0bc17/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git 
a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
 
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
index 87dd5d5..5a3f104 100644
--- 
a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ 
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
@@ -77,18 +77,18 @@ public class StreamingManager {
         reloadAllStreaming();
     }
 
-    private class SyncListener implements Broadcaster.Listener {
+    private class SyncListener extends Broadcaster.Listener {
         @Override
-        public void clearAll() {
-            // TODO Auto-generated method stub
-            
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            clearCache();
         }
 
         @Override
-        public void notify(String entity, Event event, String cacheKey) throws 
IOException {
-            if (event == Event.CREATE || event == Event.UPDATE) {
+        public void onEntityChange(Broadcaster broadcaster, String entity, 
Event event, String cacheKey) throws IOException {
+            if (event == Event.DROP)
+                removeStreamingLocal(cacheKey);
+            else
                 reloadStreamingConfigLocal(cacheKey);
-            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/52d0bc17/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index a3b675b..8b982e2 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -78,18 +78,18 @@ public class KafkaConfigManager {
         reloadAllKafkaConfig();
     }
 
-    private class SyncListener implements Broadcaster.Listener {
+    private class SyncListener extends Broadcaster.Listener {
         @Override
-        public void clearAll() {
-            // TODO Auto-generated method stub
-            
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            clearCache();
         }
 
         @Override
-        public void notify(String entity, Event event, String cacheKey) throws 
IOException {
-            if (event == Event.CREATE || event == Event.UPDATE) {
+        public void onEntityChange(Broadcaster broadcaster, String entity, 
Event event, String cacheKey) throws IOException {
+            if (event == Event.DROP)
+                removeKafkaConfigLocal(cacheKey);
+            else
                 reloadKafkaConfigLocal(cacheKey);
-            }
         }
     }
 
@@ -215,6 +215,10 @@ public class KafkaConfigManager {
         kafkaMap.remove(kafkaConfig.getName());
     }
 
+    private void removeKafkaConfigLocal(String name) {
+        kafkaMap.remove(name);
+    }
+    
     private void reloadAllKafkaConfig() throws IOException {
         ResourceStore store = getStore();
         logger.info("Reloading Kafka Metadata from folder " + 
store.getReadableResourcePath(ResourceStore.KAFKA_RESOURCE_ROOT));

Reply via email to