KYLIN-2033 Broadcaster stronger sync locking and more comments
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e10f2b92 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e10f2b92 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e10f2b92 Branch: refs/heads/1.5.x-CDH5.7 Commit: e10f2b922006a002ea9cb58ff11a4ecd9aa749c9 Parents: bf127a9 Author: Yang Li <liy...@apache.org> Authored: Sun Sep 25 13:54:51 2016 +0800 Committer: Yang Li <liy...@apache.org> Committed: Sun Sep 25 13:54:51 2016 +0800 ---------------------------------------------------------------------- .../kylin/metadata/cachesync/Broadcaster.java | 119 ++++++++++--------- 1 file changed, 65 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/e10f2b92/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java index 75b2333..8d34cc0 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java @@ -43,7 +43,16 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; /** - * Broadcast kylin event out + * Broadcast metadata changes across all Kylin servers. + * + * The origin server announce the event via Rest API to all Kylin servers including itself. + * On target server, listeners are registered to process events. As part of processing, a + * listener can re-notify a new event to other local listeners. + * + * A typical project schema change event: + * - model is update on origin server, a "model" update event is announced + * - on all servers, model listener is invoked, reload the model, and notify a "project_schema" update event + * - all listeners respond to the "project_schema" update -- reload cube desc, clear project L2 cache, clear calcite data source etc */ public class Broadcaster { @@ -57,13 +66,9 @@ public class Broadcaster { private static final ConcurrentHashMap<KylinConfig, Broadcaster> CACHE = new ConcurrentHashMap<KylinConfig, Broadcaster>(); public static Broadcaster getInstance(KylinConfig config) { - Broadcaster r = CACHE.get(config); - if (r != null) { - return r; - } - synchronized (Broadcaster.class) { - r = CACHE.get(config); + synchronized (CACHE) { + Broadcaster r = CACHE.get(config); if (r != null) { return r; } @@ -79,7 +84,9 @@ public class Broadcaster { // call Broadcaster.getInstance().notifyClearAll() to clear cache static void clearCache() { - CACHE.clear(); + synchronized (CACHE) { + CACHE.clear(); + } } // ============================================================================ @@ -134,22 +141,24 @@ public class Broadcaster { } public void registerListener(Listener listener, String... entities) { - // ignore re-registration - List<Listener> all = listenerMap.get(SYNC_ALL); - if (all != null && all.contains(listener)) { - return; - } + synchronized (CACHE) { + // ignore re-registration + List<Listener> all = listenerMap.get(SYNC_ALL); + if (all != null && all.contains(listener)) { + return; + } - for (String entity : entities) { - if (!StringUtils.isBlank(entity)) - addListener(entity, listener); + for (String entity : entities) { + if (!StringUtils.isBlank(entity)) + addListener(entity, listener); + } + addListener(SYNC_ALL, listener); + addListener(SYNC_PRJ_SCHEMA, listener); + addListener(SYNC_PRJ_DATA, listener); } - addListener(SYNC_ALL, listener); - addListener(SYNC_PRJ_SCHEMA, listener); - addListener(SYNC_PRJ_DATA, listener); } - synchronized private void addListener(String entity, Listener listener) { + private void addListener(String entity, Listener listener) { List<Listener> list = listenerMap.get(entity); if (list == null) { list = new ArrayList<>(); @@ -170,42 +179,44 @@ public class Broadcaster { notifyListener(SYNC_PRJ_DATA, Event.UPDATE, project); } - public synchronized void notifyListener(String entity, Event event, String cacheKey) throws IOException { - List<Listener> list = listenerMap.get(entity); - if (list == null) - return; - - logger.debug("Broadcasting metadata change: entity=" + entity + ", event=" + event + ", cacheKey=" + cacheKey + ", listeners=" + list); - - // prevents concurrent modification exception - list = Lists.newArrayList(list); - switch (entity) { - case SYNC_ALL: - for (Listener l : list) { - l.onClearAll(this); - } - clearCache(); // clear broadcaster too in the end - break; - case SYNC_PRJ_SCHEMA: - ProjectManager.getInstance(config).clearL2Cache(); - for (Listener l : list) { - l.onProjectSchemaChange(this, cacheKey); - } - break; - case SYNC_PRJ_DATA: - ProjectManager.getInstance(config).clearL2Cache(); // cube's first becoming ready leads to schema change too - for (Listener l : list) { - l.onProjectDataChange(this, cacheKey); - } - break; - default: - for (Listener l : list) { - l.onEntityChange(this, entity, event, cacheKey); + public void notifyListener(String entity, Event event, String cacheKey) throws IOException { + synchronized (CACHE) { + List<Listener> list = listenerMap.get(entity); + if (list == null) + return; + + logger.debug("Broadcasting metadata change: entity=" + entity + ", event=" + event + ", cacheKey=" + cacheKey + ", listeners=" + list); + + // prevents concurrent modification exception + list = Lists.newArrayList(list); + switch (entity) { + case SYNC_ALL: + for (Listener l : list) { + l.onClearAll(this); + } + clearCache(); // clear broadcaster too in the end + break; + case SYNC_PRJ_SCHEMA: + ProjectManager.getInstance(config).clearL2Cache(); + for (Listener l : list) { + l.onProjectSchemaChange(this, cacheKey); + } + break; + case SYNC_PRJ_DATA: + ProjectManager.getInstance(config).clearL2Cache(); // cube's first becoming ready leads to schema change too + for (Listener l : list) { + l.onProjectDataChange(this, cacheKey); + } + break; + default: + for (Listener l : list) { + l.onEntityChange(this, entity, event, cacheKey); + } + break; } - break; + + logger.debug("Done broadcasting metadata change: entity=" + entity + ", event=" + event + ", cacheKey=" + cacheKey); } - - logger.debug("Done broadcasting metadata change: entity=" + entity + ", event=" + event + ", cacheKey=" + cacheKey); } /**