Broadercaster should allow dynamic rest server list
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3fbf90ae Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3fbf90ae Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3fbf90ae Branch: refs/heads/master Commit: 3fbf90aed1bd78c1f44f9fd1f37fc34ffa704762 Parents: eafbe73 Author: shaofengshi <[email protected]> Authored: Fri May 26 17:55:36 2017 +0800 Committer: hongbin ma <[email protected]> Committed: Sat May 27 16:20:07 2017 +0800 ---------------------------------------------------------------------- .../kylin/common/restclient/RestClient.java | 11 +++++++++- .../kylin/metadata/cachesync/Broadcaster.java | 23 ++++++++++++-------- 2 files changed, 24 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/3fbf90ae/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java index 33a4e7a..fc34a6b 100644 --- a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java +++ b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java @@ -33,6 +33,7 @@ import org.apache.http.HttpResponse; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; +import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPut; @@ -65,6 +66,10 @@ public class RestClient { private static final int HTTP_CONNECTION_TIMEOUT_MS = 30000; private static final int HTTP_SOCKET_TIMEOUT_MS = 120000; + public static final String SCHEME_HTTP = "http://"; + + public static final String KYLIN_API_PATH = "/kylin/api"; + public static boolean matchFullRestPattern(String uri) { Matcher m = fullRestPattern.matcher(uri); return m.matches(); @@ -97,7 +102,7 @@ public class RestClient { this.port = port; this.userName = userName; this.password = password; - this.baseUrl = "http://" + host + ":" + port + "/kylin/api"; + this.baseUrl = SCHEME_HTTP + host + ":" + port + KYLIN_API_PATH; final HttpParams httpParams = new BasicHttpParams(); HttpConnectionParams.setSoTimeout(httpParams, HTTP_SOCKET_TIMEOUT_MS); @@ -114,6 +119,10 @@ public class RestClient { } public void wipeCache(String entity, String event, String cacheKey) throws IOException { + wipeCache(client, baseUrl, entity, event, cacheKey); + } + + public static void wipeCache(HttpClient client, String baseUrl, String entity, String event, String cacheKey) throws IOException { String url = baseUrl + "/cache/" + entity + "/" + cacheKey + "/" + event; HttpPut request = new HttpPut(url); http://git-wip-us.apache.org/repos/asf/kylin/blob/3fbf90ae/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java index 1394f7b..35d2f42 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java @@ -32,6 +32,11 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang.StringUtils; +import org.apache.http.client.HttpClient; +import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.params.BasicHttpParams; +import org.apache.http.params.HttpConnectionParams; +import org.apache.http.params.HttpParams; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.restclient.RestClient; import org.apache.kylin.common.util.DaemonThreadFactory; @@ -104,29 +109,29 @@ public class Broadcaster { final String[] nodes = config.getRestServers(); if (nodes == null || nodes.length < 1) { logger.warn("There is no available rest server; check the 'kylin.server.cluster-servers' config"); - broadcastEvents = null; // disable the broadcaster - return; } logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes)); Executors.newSingleThreadExecutor(new DaemonThreadFactory()).execute(new Runnable() { @Override public void run() { - final List<RestClient> restClients = Lists.newArrayList(); - for (String node : config.getRestServers()) { - restClients.add(new RestClient(node)); - } - final ExecutorService wipingCachePool = Executors.newFixedThreadPool(restClients.size(), new DaemonThreadFactory()); + final HttpParams httpParams = new BasicHttpParams(); + HttpConnectionParams.setConnectionTimeout(httpParams, 3000); + + final HttpClient client = new DefaultHttpClient(httpParams); + + final ExecutorService wipingCachePool = Executors.newFixedThreadPool(3, new DaemonThreadFactory()); while (true) { try { final BroadcastEvent broadcastEvent = broadcastEvents.takeFirst(); + logger.debug("Servers in the cluster: " + Arrays.toString(config.getRestServers())); logger.info("Announcing new broadcast event: " + broadcastEvent); - for (final RestClient restClient : restClients) { + for (final String address : config.getRestServers()) { wipingCachePool.execute(new Runnable() { @Override public void run() { try { - restClient.wipeCache(broadcastEvent.getEntity(), broadcastEvent.getEvent(), broadcastEvent.getCacheKey()); + RestClient.wipeCache(client, RestClient.SCHEME_HTTP + address + RestClient.KYLIN_API_PATH, broadcastEvent.getEntity(), broadcastEvent.getEvent(), broadcastEvent.getCacheKey()); } catch (IOException e) { logger.warn("Thread failed during wipe cache at " + broadcastEvent, e); }
