KYLIN-2716: replace non-thread-safe WeakHashMap with Guava Cache for htableInfoCache in CubeService
Signed-off-by: Li Yang <liy...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/80739578 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/80739578 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/80739578 Branch: refs/heads/master Commit: 8073957873e114a2a1152f73e4942be66730761e Parents: bdb3a5f Author: Zhong <nju_y...@apache.org> Authored: Wed Aug 2 16:27:39 2017 +0800 Committer: Li Yang <liy...@apache.org> Committed: Sun Aug 20 17:30:48 2017 +0800 ---------------------------------------------------------------------- .../kylin/rest/controller/CubeController.java | 3 +- .../rest/controller2/CubeControllerV2.java | 3 +- .../apache/kylin/rest/service/CubeService.java | 99 +++++++++++++++----- 3 files changed, 81 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/80739578/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java index a370292..a2cf0fb 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutionException; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.util.JsonUtil; @@ -553,7 +554,7 @@ public class CubeController extends BasicController { // Get info of given table. try { hr = cubeService.getHTableInfo(tableName); - } catch (IOException e) { + } catch (IOException | ExecutionException e) { logger.error("Failed to calcuate size of HTable \"" + tableName + "\".", e); } http://git-wip-us.apache.org/repos/asf/kylin/blob/80739578/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java b/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java index 9ffc062..aba2cf9 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller2/CubeControllerV2.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -445,7 +446,7 @@ public class CubeControllerV2 extends BasicController { // Get info of given table. try { hr = cubeService.getHTableInfo(tableName); - } catch (IOException e) { + } catch (IOException | ExecutionException e) { logger.error("Failed to calculate size of HTable \"" + tableName + "\".", e); } http://git-wip-us.apache.org/repos/asf/kylin/blob/80739578/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java index aa42cb0..c32133d 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -24,7 +24,8 @@ import java.util.Collections; import java.util.Date; import java.util.EnumSet; import java.util.List; -import java.util.WeakHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; @@ -41,6 +42,7 @@ import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.job.exception.JobException; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.metadata.cachesync.Broadcaster; import org.apache.kylin.metadata.draft.Draft; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; @@ -52,6 +54,7 @@ import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.exception.BadRequestException; import org.apache.kylin.rest.exception.ForbiddenException; +import org.apache.kylin.rest.exception.InternalErrorException; import org.apache.kylin.rest.msg.Message; import org.apache.kylin.rest.msg.MsgPicker; import org.apache.kylin.rest.request.MetricsRequest; @@ -61,6 +64,7 @@ import org.apache.kylin.rest.security.AclPermission; import org.apache.kylin.rest.util.AclUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.security.access.AccessDeniedException; @@ -69,6 +73,11 @@ import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.stereotype.Component; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import com.google.common.collect.Lists; /** @@ -77,13 +86,13 @@ import com.google.common.collect.Lists; * @author yangli9 */ @Component("cubeMgmtService") -public class CubeService extends BasicService { +public class CubeService extends BasicService implements InitializingBean { private static final Logger logger = LoggerFactory.getLogger(CubeService.class); public static final char[] VALID_CUBENAME = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890_".toCharArray(); - private WeakHashMap<String, HBaseResponse> htableInfoCache = new WeakHashMap<>(); + private static final int CACHE_MAX_SIZE = 5000; @Autowired @Qualifier("accessService") @@ -100,6 +109,69 @@ public class CubeService extends BasicService { @Autowired private AclUtil aclUtil; + /* + * (non-Javadoc) + * + * @see + * org.springframework.beans.factory.InitializingBean#afterPropertiesSet() + */ + @SuppressWarnings("unchecked") + @Override + public void afterPropertiesSet() throws Exception { + Broadcaster.getInstance(getConfig()).registerListener(new HTableInfoSyncListener(), "cube"); + } + + private final LoadingCache<String, HBaseResponse> htableInfoCache = CacheBuilder.newBuilder() + .maximumSize(CACHE_MAX_SIZE).expireAfterAccess(7, TimeUnit.DAYS) + .removalListener(new RemovalListener<String, HBaseResponse>() { + @Override + public void onRemoval(RemovalNotification<String, HBaseResponse> notification) { + logger.info("Hbase table: " + notification.getKey() + " cache is removed due to " + + notification.getCause()); + } + }).build(new CacheLoader<String, HBaseResponse>() { + @Override + public HBaseResponse load(String tableName) throws Exception { + HBaseResponse hr = new HBaseResponse(); + if ("hbase".equals(getConfig().getMetadataUrl().getScheme())) { + try { + // use reflection to isolate NoClassDef errors when HBase is not available + hr = (HBaseResponse) Class.forName("org.apache.kylin.rest.service.HBaseInfoUtil")// + .getMethod("getHBaseInfo", new Class[] { String.class, String.class })// + .invoke(null, new Object[] { tableName, getConfig().getStorageUrl() }); + } catch (Throwable e) { + throw new IOException(e); + } + } + return hr; + } + }); + + private class HTableInfoSyncListener extends Broadcaster.Listener { + @Override + public void onClearAll(Broadcaster broadcaster) throws IOException { + htableInfoCache.invalidateAll(); + } + + @Override + public void onEntityChange(Broadcaster broadcaster, String entity, Broadcaster.Event event, String cacheKey) + throws IOException { + String cubeName = cacheKey; + + CubeInstance cube = getCubeManager().getCube(cubeName); + if (null == cube) { + throw new InternalErrorException("Cannot find cube " + cubeName); + } + + List<String> htableNameList = Lists.newArrayListWithExpectedSize(cube.getSegments().size()); + for (CubeSegment segment : cube.getSegments()) { + htableNameList.add(segment.getStorageLocationIdentifier()); + } + + htableInfoCache.invalidateAll(htableNameList); + } + } + @PostFilter(Constant.ACCESS_POST_FILTER_READ) public List<CubeInstance> listAllCubes(final String cubeName, final String projectName, final String modelName, boolean exactMatch) { List<CubeInstance> cubeInstances = null; @@ -418,25 +490,8 @@ public class CubeService extends BasicService { * if error happens * @throws IOException Exception when HTable resource is not closed correctly. */ - public HBaseResponse getHTableInfo(String tableName) throws IOException { - if (htableInfoCache.containsKey(tableName)) { - return htableInfoCache.get(tableName); - } - - HBaseResponse hr = new HBaseResponse(); - if ("hbase".equals(getConfig().getMetadataUrl().getScheme())) { - try { - // use reflection to isolate NoClassDef errors when HBase is not available - hr = (HBaseResponse) Class.forName("org.apache.kylin.rest.service.HBaseInfoUtil")// - .getMethod("getHBaseInfo", new Class[] { String.class, String.class })// - .invoke(null, new Object[] { tableName, this.getConfig().getStorageUrl() }); - } catch (Throwable e) { - throw new IOException(e); - } - } - - htableInfoCache.put(tableName, hr); - return hr; + public HBaseResponse getHTableInfo(String tableName) throws IOException, ExecutionException { + return htableInfoCache.get(tableName); } @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN