KYLIN-2619 refine Broadcaster

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/87d5d8db
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/87d5d8db
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/87d5d8db

Branch: refs/heads/master
Commit: 87d5d8db276b590c86c473db5997414285b2d689
Parents: 3fbf90a
Author: shaofengshi <shaofeng...@apache.org>
Authored: Sat May 27 14:27:39 2017 +0800
Committer: hongbin ma <m...@kyligence.io>
Committed: Sat May 27 16:20:07 2017 +0800

----------------------------------------------------------------------
 .../kylin/common/restclient/RestClient.java     | 10 +++----
 .../kylin/metadata/cachesync/Broadcaster.java   | 28 +++++++++++---------
 2 files changed, 18 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/87d5d8db/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java 
b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
index fc34a6b..13490cb 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
@@ -33,7 +33,6 @@ import org.apache.http.HttpResponse;
 import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
-import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpPut;
@@ -119,19 +118,16 @@ public class RestClient {
     }
 
     public void wipeCache(String entity, String event, String cacheKey) throws 
IOException {
-        wipeCache(client, baseUrl, entity, event, cacheKey);
-    }
-
-    public static void wipeCache(HttpClient client, String baseUrl, String 
entity, String event, String cacheKey) throws IOException {
         String url = baseUrl + "/cache/" + entity + "/" + cacheKey + "/" + 
event;
         HttpPut request = new HttpPut(url);
 
         try {
             HttpResponse response = client.execute(request);
-            String msg = EntityUtils.toString(response.getEntity());
 
-            if (response.getStatusLine().getStatusCode() != 200)
+            if (response.getStatusLine().getStatusCode() != 200) {
+                String msg = EntityUtils.toString(response.getEntity());
                 throw new IOException("Invalid response " + 
response.getStatusLine().getStatusCode() + " with cache wipe url " + url + "\n" 
+ msg);
+            }
         } catch (Exception ex) {
             throw new IOException(ex);
         } finally {

http://git-wip-us.apache.org/repos/asf/kylin/blob/87d5d8db/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
index 35d2f42..4a8c6d3 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
@@ -29,14 +29,12 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.http.client.HttpClient;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.http.params.BasicHttpParams;
-import org.apache.http.params.HttpConnectionParams;
-import org.apache.http.params.HttpParams;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.restclient.RestClient;
 import org.apache.kylin.common.util.DaemonThreadFactory;
@@ -115,23 +113,27 @@ public class Broadcaster {
         Executors.newSingleThreadExecutor(new 
DaemonThreadFactory()).execute(new Runnable() {
             @Override
             public void run() {
-                final HttpParams httpParams = new BasicHttpParams();
-                HttpConnectionParams.setConnectionTimeout(httpParams, 3000);
+                final Map<String, RestClient> restClientMap = 
Maps.newHashMap();
+                final ExecutorService wipingCachePool = new 
ThreadPoolExecutor(1, 10, 60L, TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>());
 
-                final HttpClient client = new DefaultHttpClient(httpParams);
-
-                final ExecutorService wipingCachePool = 
Executors.newFixedThreadPool(3, new DaemonThreadFactory());
                 while (true) {
                     try {
                         final BroadcastEvent broadcastEvent = 
broadcastEvents.takeFirst();
-                        logger.debug("Servers in the cluster: " + 
Arrays.toString(config.getRestServers()));
+                        String[] restServers = config.getRestServers();
+                        logger.info("Servers in the cluster: " + 
Arrays.toString(restServers));
+                        for (final String node : restServers) {
+                            if (restClientMap.containsKey(node) == false) {
+                                restClientMap.put(node, new RestClient(node));
+                            }
+                        }
+
                         logger.info("Announcing new broadcast event: " + 
broadcastEvent);
-                        for (final String address : config.getRestServers()) {
+                        for (final String node : restServers) {
                             wipingCachePool.execute(new Runnable() {
                                 @Override
                                 public void run() {
                                     try {
-                                        RestClient.wipeCache(client, 
RestClient.SCHEME_HTTP + address + RestClient.KYLIN_API_PATH,  
broadcastEvent.getEntity(), broadcastEvent.getEvent(), 
broadcastEvent.getCacheKey());
+                                        
restClientMap.get(node).wipeCache(broadcastEvent.getEntity(), 
broadcastEvent.getEvent(), broadcastEvent.getCacheKey());
                                     } catch (IOException e) {
                                         logger.warn("Thread failed during wipe 
cache at " + broadcastEvent, e);
                                     }

Reply via email to