KYLIN-2619 refine Broadcaster
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/87d5d8db Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/87d5d8db Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/87d5d8db Branch: refs/heads/master Commit: 87d5d8db276b590c86c473db5997414285b2d689 Parents: 3fbf90a Author: shaofengshi <shaofeng...@apache.org> Authored: Sat May 27 14:27:39 2017 +0800 Committer: hongbin ma <m...@kyligence.io> Committed: Sat May 27 16:20:07 2017 +0800 ---------------------------------------------------------------------- .../kylin/common/restclient/RestClient.java | 10 +++---- .../kylin/metadata/cachesync/Broadcaster.java | 28 +++++++++++--------- 2 files changed, 18 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/87d5d8db/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java index fc34a6b..13490cb 100644 --- a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java +++ b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java @@ -33,7 +33,6 @@ import org.apache.http.HttpResponse; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; -import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPut; @@ -119,19 +118,16 @@ public class RestClient { } public void wipeCache(String entity, String event, String cacheKey) throws IOException { - wipeCache(client, baseUrl, entity, event, cacheKey); - } - - public static void wipeCache(HttpClient client, String baseUrl, String entity, String event, String cacheKey) throws IOException { String url = baseUrl + "/cache/" + entity + "/" + cacheKey + "/" + event; HttpPut request = new HttpPut(url); try { HttpResponse response = client.execute(request); - String msg = EntityUtils.toString(response.getEntity()); - if (response.getStatusLine().getStatusCode() != 200) + if (response.getStatusLine().getStatusCode() != 200) { + String msg = EntityUtils.toString(response.getEntity()); throw new IOException("Invalid response " + response.getStatusLine().getStatusCode() + " with cache wipe url " + url + "\n" + msg); + } } catch (Exception ex) { throw new IOException(ex); } finally { http://git-wip-us.apache.org/repos/asf/kylin/blob/87d5d8db/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java index 35d2f42..4a8c6d3 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java @@ -29,14 +29,12 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang.StringUtils; -import org.apache.http.client.HttpClient; -import org.apache.http.impl.client.DefaultHttpClient; -import org.apache.http.params.BasicHttpParams; -import org.apache.http.params.HttpConnectionParams; -import org.apache.http.params.HttpParams; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.restclient.RestClient; import org.apache.kylin.common.util.DaemonThreadFactory; @@ -115,23 +113,27 @@ public class Broadcaster { Executors.newSingleThreadExecutor(new DaemonThreadFactory()).execute(new Runnable() { @Override public void run() { - final HttpParams httpParams = new BasicHttpParams(); - HttpConnectionParams.setConnectionTimeout(httpParams, 3000); + final Map<String, RestClient> restClientMap = Maps.newHashMap(); + final ExecutorService wipingCachePool = new ThreadPoolExecutor(1, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); - final HttpClient client = new DefaultHttpClient(httpParams); - - final ExecutorService wipingCachePool = Executors.newFixedThreadPool(3, new DaemonThreadFactory()); while (true) { try { final BroadcastEvent broadcastEvent = broadcastEvents.takeFirst(); - logger.debug("Servers in the cluster: " + Arrays.toString(config.getRestServers())); + String[] restServers = config.getRestServers(); + logger.info("Servers in the cluster: " + Arrays.toString(restServers)); + for (final String node : restServers) { + if (restClientMap.containsKey(node) == false) { + restClientMap.put(node, new RestClient(node)); + } + } + logger.info("Announcing new broadcast event: " + broadcastEvent); - for (final String address : config.getRestServers()) { + for (final String node : restServers) { wipingCachePool.execute(new Runnable() { @Override public void run() { try { - RestClient.wipeCache(client, RestClient.SCHEME_HTTP + address + RestClient.KYLIN_API_PATH, broadcastEvent.getEntity(), broadcastEvent.getEvent(), broadcastEvent.getCacheKey()); + restClientMap.get(node).wipeCache(broadcastEvent.getEntity(), broadcastEvent.getEvent(), broadcastEvent.getCacheKey()); } catch (IOException e) { logger.warn("Thread failed during wipe cache at " + broadcastEvent, e); }