KYLIN-3158 Retry failed the metadata sync event on the failed node only
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/baf7133d Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/baf7133d Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/baf7133d Branch: refs/heads/master Commit: baf7133dabd8f1e53f5235b61da2bc5da334d870 Parents: 0fce61c Author: Li Yang <liy...@apache.org> Authored: Fri Jan 12 21:46:47 2018 +0800 Committer: Li Yang <liy...@apache.org> Committed: Fri Jan 26 22:54:58 2018 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 4 + .../kylin/metadata/cachesync/Broadcaster.java | 127 ++++++++++++++----- .../metadata/cachesync/SingleValueCache.java | 6 +- .../metadata/cachesync/BroadcasterTest.java | 43 +++++++ .../kylin/rest/init/InitialTaskManager.java | 2 +- .../apache/kylin/rest/service/CacheService.java | 4 +- 6 files changed, 151 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/baf7133d/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index b053daa..5045b2f 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -303,6 +303,10 @@ abstract public class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.metadata.sync-retries", "3")); } + public String getCacheSyncErrorHandler() { + return getOptional("kylin.metadata.sync-error-handler"); + } + // for test only public void setMetadataUrl(String metadataUrl) { setProperty("kylin.metadata.url", metadataUrl); http://git-wip-us.apache.org/repos/asf/kylin/blob/baf7133d/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java index 05910ea..6462a27 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java @@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.restclient.RestClient; +import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.DaemonThreadFactory; import org.apache.kylin.metadata.project.ProjectManager; import org.slf4j.Logger; @@ -68,7 +69,7 @@ public class Broadcaster { public static Broadcaster getInstance(KylinConfig config) { return config.getManager(Broadcaster.class); } - + // called by reflection static Broadcaster newInstance(KylinConfig config) { return new Broadcaster(config); @@ -79,13 +80,19 @@ public class Broadcaster { static final Map<String, List<Listener>> staticListenerMap = Maps.newConcurrentMap(); private KylinConfig config; + private ExecutorService announceMainLoop; + private ExecutorService announceThreadPool; + private SyncErrorHandler syncErrorHandler; private BlockingDeque<BroadcastEvent> broadcastEvents = new LinkedBlockingDeque<>(); private Map<String, List<Listener>> listenerMap = Maps.newConcurrentMap(); - private AtomicLong counter = new AtomicLong(); - + private AtomicLong counter = new AtomicLong(); // a counter for testing purpose + private Broadcaster(final KylinConfig config) { this.config = config; - final int retryLimitTimes = config.getCacheSyncRetrys(); + this.syncErrorHandler = getSyncErrorHandler(config); + this.announceMainLoop = Executors.newSingleThreadExecutor(new DaemonThreadFactory()); + this.announceThreadPool = new ThreadPoolExecutor(1, 10, 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory()); final String[] nodes = config.getRestServers(); if (nodes == null || nodes.length < 1) { @@ -93,21 +100,14 @@ public class Broadcaster { } logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes)); - Executors.newSingleThreadExecutor(new DaemonThreadFactory()).execute(new Runnable() { + announceMainLoop.execute(new Runnable() { @Override public void run() { final Map<String, RestClient> restClientMap = Maps.newHashMap(); - final ExecutorService wipingCachePool = new ThreadPoolExecutor(1, 10, 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory()); - while (true) { + while (!announceThreadPool.isShutdown()) { try { final BroadcastEvent broadcastEvent = broadcastEvents.takeFirst(); - broadcastEvent.setRetryTime(broadcastEvent.getRetryTime() + 1); - if (broadcastEvent.getRetryTime() > retryLimitTimes) { - logger.info("broadcastEvent retry up to limit times, broadcastEvent:{}", broadcastEvent); - continue; - } String[] restServers = config.getRestServers(); logger.debug("Servers in the cluster: " + Arrays.toString(restServers)); @@ -117,25 +117,27 @@ public class Broadcaster { } } - logger.debug("Announcing new broadcast event: " + broadcastEvent); + String toWhere = broadcastEvent.getTargetNode(); + if (toWhere == null) + toWhere = "all"; + logger.debug("Announcing new broadcast to " + toWhere + ": " + broadcastEvent); + for (final String node : restServers) { - wipingCachePool.execute(new Runnable() { + if (!(toWhere.equals("all") || toWhere.equals(node))) + continue; + + announceThreadPool.execute(new Runnable() { @Override public void run() { + RestClient restClient = restClientMap.get(node); try { - restClientMap.get(node).wipeCache(broadcastEvent.getEntity(), - broadcastEvent.getEvent(), broadcastEvent.getCacheKey()); + restClient.wipeCache(broadcastEvent.getEntity(), broadcastEvent.getEvent(), + broadcastEvent.getCacheKey()); } catch (IOException e) { - logger.warn("Thread failed during wipe cache at {}, error msg: {}", - broadcastEvent, e); - // when sync failed, put back to queue - try { - broadcastEvents.putLast(broadcastEvent); - } catch (InterruptedException ex) { - logger.warn( - "error reentry failed broadcastEvent to queue, broacastEvent:{}, error: {} ", - broadcastEvent, ex); - } + logger.error( + "Announce broadcast event failed, targetNode {} broadcastEvent {}, error msg: {}", + node, broadcastEvent, e); + syncErrorHandler.handleAnnounceError(node, restClient, broadcastEvent); } } }); @@ -148,6 +150,23 @@ public class Broadcaster { }); } + private SyncErrorHandler getSyncErrorHandler(KylinConfig config) { + String clzName = config.getCacheSyncErrorHandler(); + if (StringUtils.isEmpty(clzName)) { + clzName = DefaultSyncErrorHandler.class.getName(); + } + return (SyncErrorHandler) ClassUtil.newInstance(clzName); + } + + public KylinConfig getConfig() { + return config; + } + + public void stopAnnounce() { + announceThreadPool.shutdown(); + announceMainLoop.shutdown(); + } + // static listener survives cache wipe and goes after normal listeners public void registerStaticListener(Listener listener, String... entities) { doRegisterListener(staticListenerMap, listener, entities); @@ -263,15 +282,19 @@ public class Broadcaster { } /** - * Broadcast an event out + * Announce an event out to peer kylin servers */ - public void queue(String entity, String event, String key) { + public void announce(String entity, String event, String key) { + announce(new BroadcastEvent(entity, event, key)); + } + + public void announce(BroadcastEvent event) { if (broadcastEvents == null) return; try { counter.incrementAndGet(); - broadcastEvents.putLast(new BroadcastEvent(entity, event, key)); + broadcastEvents.putLast(event); } catch (Exception e) { counter.decrementAndGet(); logger.error("error putting BroadcastEvent", e); @@ -282,6 +305,40 @@ public class Broadcaster { return counter.getAndSet(0); } + // ============================================================================ + + public static class DefaultSyncErrorHandler implements SyncErrorHandler { + Broadcaster broadcaster; + int maxRetryTimes; + + @Override + public void init(Broadcaster broadcaster) { + this.maxRetryTimes = broadcaster.getConfig().getCacheSyncRetrys(); + this.broadcaster = broadcaster; + } + + @Override + public void handleAnnounceError(String targetNode, RestClient restClient, BroadcastEvent event) { + int retry = event.getRetryTime() + 1; + + // when sync failed, put back to queue to retry + if (retry < maxRetryTimes) { + event.setRetryTime(retry); + event.setTargetNode(targetNode); + broadcaster.announce(event); + } else { + logger.error("Announce broadcast event exceeds retry limit, abandon targetNode {} broadcastEvent {}", + targetNode, event); + } + } + } + + public interface SyncErrorHandler { + void init(Broadcaster broadcaster); + + void handleAnnounceError(String targetNode, RestClient restClient, BroadcastEvent event); + } + public enum Event { CREATE("create"), UPDATE("update"), DROP("drop"); @@ -326,6 +383,8 @@ public class Broadcaster { public static class BroadcastEvent { private int retryTime; + private String targetNode; // NULL means to all + private String entity; private String event; private String cacheKey; @@ -345,6 +404,14 @@ public class Broadcaster { this.retryTime = retryTime; } + public String getTargetNode() { + return targetNode; + } + + public void setTargetNode(String targetNode) { + this.targetNode = targetNode; + } + public String getEntity() { return entity; } http://git-wip-us.apache.org/repos/asf/kylin/blob/baf7133d/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/SingleValueCache.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/SingleValueCache.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/SingleValueCache.java index 4bfaeae..f803c8b 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/SingleValueCache.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/SingleValueCache.java @@ -49,9 +49,9 @@ public abstract class SingleValueCache<K, V> extends AbstractCache<K, V> { innerCache.put(key, value); if (!exists) { - getBroadcaster().queue(syncEntity, Broadcaster.Event.CREATE.getType(), key.toString()); + getBroadcaster().announce(syncEntity, Broadcaster.Event.CREATE.getType(), key.toString()); } else { - getBroadcaster().queue(syncEntity, Broadcaster.Event.UPDATE.getType(), key.toString()); + getBroadcaster().announce(syncEntity, Broadcaster.Event.UPDATE.getType(), key.toString()); } } @@ -65,7 +65,7 @@ public abstract class SingleValueCache<K, V> extends AbstractCache<K, V> { innerCache.remove(key); if (exists) { - getBroadcaster().queue(syncEntity, Broadcaster.Event.DROP.getType(), key.toString()); + getBroadcaster().announce(syncEntity, Broadcaster.Event.DROP.getType(), key.toString()); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/baf7133d/core-metadata/src/test/java/org/apache/kylin/metadata/cachesync/BroadcasterTest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/cachesync/BroadcasterTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/cachesync/BroadcasterTest.java index 80f26f9..762512f 100644 --- a/core-metadata/src/test/java/org/apache/kylin/metadata/cachesync/BroadcasterTest.java +++ b/core-metadata/src/test/java/org/apache/kylin/metadata/cachesync/BroadcasterTest.java @@ -21,9 +21,12 @@ package org.apache.kylin.metadata.cachesync; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.kylin.common.restclient.RestClient; import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.metadata.cachesync.Broadcaster.BroadcastEvent; import org.apache.kylin.metadata.cachesync.Broadcaster.Event; import org.apache.kylin.metadata.cachesync.Broadcaster.Listener; +import org.apache.kylin.metadata.cachesync.Broadcaster.SyncErrorHandler; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -64,6 +67,7 @@ public class BroadcasterTest extends LocalFileMetadataTestCase { broadcaster.notifyListener("test", Event.UPDATE, ""); + broadcaster.stopAnnounce(); Broadcaster.staticListenerMap.clear(); } @@ -90,6 +94,45 @@ public class BroadcasterTest extends LocalFileMetadataTestCase { broadcaster.notifyNonStaticListener("test", Event.UPDATE, ""); + broadcaster.stopAnnounce(); Broadcaster.staticListenerMap.clear(); } + + @Test + public void testAnnounceErrorHandler() throws IOException, InterruptedException { + System.setProperty("kylin.server.cluster-servers", "localhost:717"); + System.setProperty("kylin.metadata.sync-error-handler", MockupErrHandler.class.getName()); + try { + Broadcaster broadcaster = Broadcaster.getInstance(getTestConfig()); + + broadcaster.announce("all", "update", "all"); + + for (int i = 0; i < 30 && MockupErrHandler.atom.get() == 0; i++) { + Thread.sleep(1000); + } + + broadcaster.stopAnnounce(); + Broadcaster.staticListenerMap.clear(); + } finally { + System.clearProperty("kylin.server.cluster-servers"); + System.clearProperty("kylin.metadata.sync-error-handler"); + } + + Assert.assertTrue(MockupErrHandler.atom.get() > 0); + } + + public static class MockupErrHandler implements SyncErrorHandler { + static AtomicInteger atom = new AtomicInteger(); + + @Override + public void init(Broadcaster broadcaster) { + } + + @Override + public void handleAnnounceError(String targetNode, RestClient restClient, BroadcastEvent event) { + Assert.assertEquals("localhost:717", targetNode); + atom.incrementAndGet(); + } + + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/baf7133d/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java b/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java index 14052ce..467ef82 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java +++ b/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java @@ -53,7 +53,7 @@ public class InitialTaskManager implements InitializingBean { for (String taskClass : taskClasses) { try { InitialTask task = (InitialTask) Class.forName(taskClass).newInstance(); - logger.info("Running task: " + taskClass); + logger.info("Running initial task: " + taskClass); task.execute(); } catch (Throwable e) { logger.error("Initial task failed: " + taskClass, e); http://git-wip-us.apache.org/repos/asf/kylin/blob/baf7133d/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java index 98e06e0..b61309e 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.kylin.metadata.cachesync.Broadcaster; import org.apache.kylin.metadata.cachesync.Broadcaster.Event; +import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; @@ -49,6 +50,7 @@ public class CacheService extends BasicService implements InitializingBean { @Override public void onClearAll(Broadcaster broadcaster) throws IOException { cleanAllDataCache(); + HBaseConnection.clearConnCache(); // take the chance to clear HBase connection cache as well } @Override @@ -104,7 +106,7 @@ public class CacheService extends BasicService implements InitializingBean { public void annouceWipeCache(String entity, String event, String cacheKey) { Broadcaster broadcaster = Broadcaster.getInstance(getConfig()); - broadcaster.queue(entity, event, cacheKey); + broadcaster.announce(entity, event, cacheKey); } public void notifyMetadataChange(String entity, Event event, String cacheKey) throws IOException {