KYLIN-2716: replace non-thread-safe WeakHashMap with Guava Cache for 
htableInfoCache in CubeService

Signed-off-by: Li Yang <liy...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/80739578
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/80739578
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/80739578

Branch: refs/heads/master
Commit: 8073957873e114a2a1152f73e4942be66730761e
Parents: bdb3a5f
Author: Zhong <nju_y...@apache.org>
Authored: Wed Aug 2 16:27:39 2017 +0800
Committer: Li Yang <liy...@apache.org>
Committed: Sun Aug 20 17:30:48 2017 +0800

----------------------------------------------------------------------
 .../kylin/rest/controller/CubeController.java   |  3 +-
 .../rest/controller2/CubeControllerV2.java      |  3 +-
 .../apache/kylin/rest/service/CubeService.java  | 99 +++++++++++++++-----
 3 files changed, 81 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/80739578/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index a370292..a2cf0fb 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.util.JsonUtil;
@@ -553,7 +554,7 @@ public class CubeController extends BasicController {
             // Get info of given table.
             try {
                 hr = cubeService.getHTableInfo(tableName);
-            } catch (IOException e) {
+            } catch (IOException | ExecutionException e) {
                 logger.error("Failed to calcuate size of HTable \"" + 
tableName + "\".", e);
             }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/80739578/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java
 
b/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java
index 9ffc062..aba2cf9 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -445,7 +446,7 @@ public class CubeControllerV2 extends BasicController {
             // Get info of given table.
             try {
                 hr = cubeService.getHTableInfo(tableName);
-            } catch (IOException e) {
+            } catch (IOException | ExecutionException e) {
                 logger.error("Failed to calculate size of HTable \"" + 
tableName + "\".", e);
             }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/80739578/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index aa42cb0..c32133d 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -24,7 +24,8 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.WeakHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
@@ -41,6 +42,7 @@ import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.draft.Draft;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
@@ -52,6 +54,7 @@ import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.exception.BadRequestException;
 import org.apache.kylin.rest.exception.ForbiddenException;
+import org.apache.kylin.rest.exception.InternalErrorException;
 import org.apache.kylin.rest.msg.Message;
 import org.apache.kylin.rest.msg.MsgPicker;
 import org.apache.kylin.rest.request.MetricsRequest;
@@ -61,6 +64,7 @@ import org.apache.kylin.rest.security.AclPermission;
 import org.apache.kylin.rest.util.AclUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.security.access.AccessDeniedException;
@@ -69,6 +73,11 @@ import 
org.springframework.security.access.prepost.PreAuthorize;
 import org.springframework.security.core.context.SecurityContextHolder;
 import org.springframework.stereotype.Component;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.Lists;
 
 /**
@@ -77,13 +86,13 @@ import com.google.common.collect.Lists;
  * @author yangli9
  */
 @Component("cubeMgmtService")
-public class CubeService extends BasicService {
+public class CubeService extends BasicService implements InitializingBean {
 
     private static final Logger logger = 
LoggerFactory.getLogger(CubeService.class);
 
     public static final char[] VALID_CUBENAME = 
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890_".toCharArray();
 
-    private WeakHashMap<String, HBaseResponse> htableInfoCache = new 
WeakHashMap<>();
+    private static final int CACHE_MAX_SIZE = 5000;
 
     @Autowired
     @Qualifier("accessService")
@@ -100,6 +109,69 @@ public class CubeService extends BasicService {
     @Autowired
     private AclUtil aclUtil;
 
+    /*
+    * (non-Javadoc)
+    *
+    * @see
+    * org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
+    */
+    @SuppressWarnings("unchecked")
+    @Override
+    public void afterPropertiesSet() throws Exception {
+        Broadcaster.getInstance(getConfig()).registerListener(new 
HTableInfoSyncListener(), "cube");
+    }
+
+    private final LoadingCache<String, HBaseResponse> htableInfoCache = 
CacheBuilder.newBuilder()
+            .maximumSize(CACHE_MAX_SIZE).expireAfterAccess(7, TimeUnit.DAYS)
+            .removalListener(new RemovalListener<String, HBaseResponse>() {
+                @Override
+                public void onRemoval(RemovalNotification<String, 
HBaseResponse> notification) {
+                    logger.info("Hbase table: " + notification.getKey() + " 
cache is removed due to "
+                            + notification.getCause());
+                }
+            }).build(new CacheLoader<String, HBaseResponse>() {
+                @Override
+                public HBaseResponse load(String tableName) throws Exception {
+                    HBaseResponse hr = new HBaseResponse();
+                    if 
("hbase".equals(getConfig().getMetadataUrl().getScheme())) {
+                        try {
+                            // use reflection to isolate NoClassDef errors 
when HBase is not available
+                            hr = (HBaseResponse) 
Class.forName("org.apache.kylin.rest.service.HBaseInfoUtil")//
+                                    .getMethod("getHBaseInfo", new Class[] { 
String.class, String.class })//
+                                    .invoke(null, new Object[] { tableName, 
getConfig().getStorageUrl() });
+                        } catch (Throwable e) {
+                            throw new IOException(e);
+                        }
+                    }
+                    return hr;
+                }
+            });
+
+    private class HTableInfoSyncListener extends Broadcaster.Listener {
+        @Override
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            htableInfoCache.invalidateAll();
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, 
Broadcaster.Event event, String cacheKey)
+                throws IOException {
+            String cubeName = cacheKey;
+
+            CubeInstance cube = getCubeManager().getCube(cubeName);
+            if (null == cube) {
+                throw new InternalErrorException("Cannot find cube " + 
cubeName);
+            }
+
+            List<String> htableNameList = 
Lists.newArrayListWithExpectedSize(cube.getSegments().size());
+            for (CubeSegment segment : cube.getSegments()) {
+                htableNameList.add(segment.getStorageLocationIdentifier());
+            }
+
+            htableInfoCache.invalidateAll(htableNameList);
+        }
+    }
+
     @PostFilter(Constant.ACCESS_POST_FILTER_READ)
     public List<CubeInstance> listAllCubes(final String cubeName, final String 
projectName, final String modelName, boolean exactMatch) {
         List<CubeInstance> cubeInstances = null;
@@ -418,25 +490,8 @@ public class CubeService extends BasicService {
      * if error happens
      * @throws IOException Exception when HTable resource is not closed 
correctly.
      */
-    public HBaseResponse getHTableInfo(String tableName) throws IOException {
-        if (htableInfoCache.containsKey(tableName)) {
-            return htableInfoCache.get(tableName);
-        }
-
-        HBaseResponse hr = new HBaseResponse();
-        if ("hbase".equals(getConfig().getMetadataUrl().getScheme())) {
-            try {
-                // use reflection to isolate NoClassDef errors when HBase is 
not available
-                hr = (HBaseResponse) 
Class.forName("org.apache.kylin.rest.service.HBaseInfoUtil")//
-                        .getMethod("getHBaseInfo", new Class[] { String.class, 
String.class })//
-                        .invoke(null, new Object[] { tableName, 
this.getConfig().getStorageUrl() });
-            } catch (Throwable e) {
-                throw new IOException(e);
-            }
-        }
-
-        htableInfoCache.put(tableName, hr);
-        return hr;
+    public HBaseResponse getHTableInfo(String tableName) throws IOException, 
ExecutionException {
+        return htableInfoCache.get(tableName);
     }
 
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN

Reply via email to