http://git-wip-us.apache.org/repos/asf/kylin/blob/72005ea5/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java index d594873..2de8527 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java @@ -44,9 +44,10 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.Serializer; -import org.apache.kylin.common.restclient.Broadcaster; -import org.apache.kylin.common.restclient.CaseInsensitiveStringCache; import org.apache.kylin.metadata.MetadataConstants; +import org.apache.kylin.metadata.cachesync.Broadcaster; +import org.apache.kylin.metadata.cachesync.Broadcaster.Event; +import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache; import org.apache.kylin.source.kafka.config.KafkaConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,8 +74,26 @@ public class KafkaConfigManager { private KafkaConfigManager(KylinConfig config) throws IOException { this.config = config; - this.kafkaMap = new CaseInsensitiveStringCache<KafkaConfig>(config, Broadcaster.TYPE.KAFKA); + this.kafkaMap = new CaseInsensitiveStringCache<KafkaConfig>(config, "kafka"); + + // touch lower level metadata before registering my listener reloadAllKafkaConfig(); + Broadcaster.getInstance(config).registerListener(new KafkaSyncListener(), "kafka"); + } + + private class KafkaSyncListener extends Broadcaster.Listener { + @Override + public void onClearAll(Broadcaster broadcaster) throws IOException { + clearCache(); + } + + @Override + public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException { + if (event == Event.DROP) + removeKafkaConfigLocal(cacheKey); + else + reloadKafkaConfigLocal(cacheKey); + } } private ResourceStore getStore() { @@ -199,6 +218,10 @@ public class KafkaConfigManager { kafkaMap.remove(kafkaConfig.getName()); } + private void removeKafkaConfigLocal(String name) { + kafkaMap.remove(name); + } + private void reloadAllKafkaConfig() throws IOException { ResourceStore store = getStore(); logger.info("Reloading Kafka Metadata from folder " + store.getReadableResourcePath(ResourceStore.KAFKA_RESOURCE_ROOT));
http://git-wip-us.apache.org/repos/asf/kylin/blob/72005ea5/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java index 3066fb5..52aa7ea 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java @@ -44,7 +44,6 @@ import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.RawResource; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.Serializer; -import org.apache.kylin.common.restclient.Broadcaster; import org.apache.kylin.common.restclient.RestClient; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.Dictionary; @@ -57,6 +56,7 @@ import org.apache.kylin.dict.DictionaryManager; import org.apache.kylin.dict.lookup.SnapshotManager; import org.apache.kylin.dict.lookup.SnapshotTable; import org.apache.kylin.engine.mr.JobBuilderSupport; +import org.apache.kylin.metadata.cachesync.Broadcaster; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TableDesc; @@ -564,7 +564,7 @@ public class CubeMigrationCLI { RestClient restClient = new RestClient(node); try { logger.info("update meta cache for " + node); - restClient.wipeCache(Broadcaster.TYPE.ALL.getType(), Broadcaster.EVENT.UPDATE.getType(), "all"); + restClient.wipeCache(Broadcaster.SYNC_ALL, Broadcaster.Event.UPDATE.getType(), Broadcaster.SYNC_ALL); } catch (IOException e) { logger.error(e.getMessage()); }